From 59e8dca499276ecb7543fde8ac2e57b9bd97b622 Mon Sep 17 00:00:00 2001 From: Jens Langhammer Date: Mon, 21 Sep 2020 21:35:50 +0200 Subject: [PATCH] sources/ldap: divide connector into password, sync and auth, add unittests for password --- passbook/sources/ldap/auth.py | 56 +++++- passbook/sources/ldap/password.py | 155 ++++++++++++++++ passbook/sources/ldap/signals.py | 12 +- .../sources/ldap/{connector.py => sync.py} | 172 +----------------- passbook/sources/ldap/tasks.py | 10 +- passbook/sources/ldap/tests.py | 149 --------------- passbook/sources/ldap/tests/__init__.py | 0 passbook/sources/ldap/tests/test_auth.py | 47 +++++ passbook/sources/ldap/tests/test_password.py | 54 ++++++ passbook/sources/ldap/tests/test_sync.py | 51 ++++++ passbook/sources/ldap/tests/utils.py | 93 ++++++++++ swagger.yaml | 28 +-- 12 files changed, 485 insertions(+), 342 deletions(-) create mode 100644 passbook/sources/ldap/password.py rename passbook/sources/ldap/{connector.py => sync.py} (51%) delete mode 100644 passbook/sources/ldap/tests.py create mode 100644 passbook/sources/ldap/tests/__init__.py create mode 100644 passbook/sources/ldap/tests/test_auth.py create mode 100644 passbook/sources/ldap/tests/test_password.py create mode 100644 passbook/sources/ldap/tests/test_sync.py create mode 100644 passbook/sources/ldap/tests/utils.py diff --git a/passbook/sources/ldap/auth.py b/passbook/sources/ldap/auth.py index 9ee4407e1..dde9a7237 100644 --- a/passbook/sources/ldap/auth.py +++ b/passbook/sources/ldap/auth.py @@ -1,9 +1,12 @@ """passbook LDAP Authentication Backend""" +from typing import Optional + +import ldap3 from django.contrib.auth.backends import ModelBackend from django.http import HttpRequest from structlog import get_logger -from passbook.sources.ldap.connector import Connector +from passbook.core.models import User from passbook.sources.ldap.models import LDAPSource LOGGER = get_logger() @@ -18,7 +21,56 @@ class LDAPBackend(ModelBackend): return None for source in LDAPSource.objects.filter(enabled=True): LOGGER.debug("LDAP Auth attempt", source=source) - user = Connector(source).auth_user(**kwargs) + user = self.auth_user(source, **kwargs) if user: return user return None + + def auth_user( + self, source: LDAPSource, password: str, **filters: str + ) -> Optional[User]: + """Try to bind as either user_dn or mail with password. + Returns True on success, otherwise False""" + users = User.objects.filter(**filters) + if not users.exists(): + return None + user: User = users.first() + if "distinguishedName" not in user.attributes: + LOGGER.debug( + "User doesn't have DN set, assuming not LDAP imported.", user=user + ) + return None + # Either has unusable password, + # or has a password, but couldn't be authenticated by ModelBackend. + # This means we check with a bind to see if the LDAP password has changed + if self.auth_user_by_bind(source, user, password): + # Password given successfully binds to LDAP, so we save it in our Database + LOGGER.debug("Updating user's password in DB", user=user) + user.set_password(password, signal=False) + user.save() + return user + # Password doesn't match + LOGGER.debug("Failed to bind, password invalid") + return None + + def auth_user_by_bind( + self, source: LDAPSource, user: User, password: str + ) -> Optional[User]: + """Attempt authentication by binding to the LDAP server as `user`. This + method should be avoided as its slow to do the bind.""" + # Try to bind as new user + LOGGER.debug("Attempting Binding as user", user=user) + try: + temp_connection = ldap3.Connection( + source.connection.server, + user=user.attributes.get("distinguishedName"), + password=password, + raise_exceptions=True, + ) + temp_connection.bind() + return user + except ldap3.core.exceptions.LDAPInvalidCredentialsResult as exception: + LOGGER.debug("LDAPInvalidCredentialsResult", user=user, error=exception) + except ldap3.core.exceptions.LDAPException as exception: + LOGGER.warning(exception) + return None diff --git a/passbook/sources/ldap/password.py b/passbook/sources/ldap/password.py new file mode 100644 index 000000000..67d021ae4 --- /dev/null +++ b/passbook/sources/ldap/password.py @@ -0,0 +1,155 @@ +"""Help validate and update passwords in LDAP""" +from enum import IntFlag +from re import split +from typing import Optional + +import ldap3 +import ldap3.core.exceptions +from structlog import get_logger + +from passbook.core.models import User +from passbook.sources.ldap.models import LDAPSource + +LOGGER = get_logger() + +NON_ALPHA = r"~!@#$%^&*_-+=`|\(){}[]:;\"'<>,.?/" +RE_DISPLAYNAME_SEPARATORS = r",\.–—_\s#\t" + + +class PwdProperties(IntFlag): + """Possible values for the pwdProperties attribute""" + + DOMAIN_PASSWORD_COMPLEX = 1 + DOMAIN_PASSWORD_NO_ANON_CHANGE = 2 + DOMAIN_PASSWORD_NO_CLEAR_CHANGE = 4 + DOMAIN_LOCKOUT_ADMINS = 8 + DOMAIN_PASSWORD_STORE_CLEARTEXT = 16 + DOMAIN_REFUSE_PASSWORD_CHANGE = 32 + + +class PasswordCategories(IntFlag): + """Password categories as defined by Microsoft, a category can only be counted + once, hence intflag.""" + + NONE = 0 + ALPHA_LOWER = 1 + ALPHA_UPPER = 2 + ALPHA_OTHER = 4 + NUMERIC = 8 + SYMBOL = 16 + + +class LDAPPasswordChanger: + """Help validate and update passwords in LDAP""" + + _source: LDAPSource + + def __init__(self, source: LDAPSource) -> None: + self._source = source + + def get_domain_root_dn(self) -> str: + """Attempt to get root DN via MS specific fields or generic LDAP fields""" + info = self._source.connection.server.info + if "rootDomainNamingContext" in info.other: + return info.other["rootDomainNamingContext"][0] + naming_contexts = info.naming_contexts + naming_contexts.sort(key=len) + return naming_contexts[0] + + def check_ad_password_complexity_enabled(self) -> bool: + """Check if DOMAIN_PASSWORD_COMPLEX is enabled""" + root_dn = self.get_domain_root_dn() + root_attrs = self._source.connection.extend.standard.paged_search( + search_base=root_dn, + search_filter="(objectClass=*)", + search_scope=ldap3.BASE, + attributes=["pwdProperties"], + ) + root_attrs = list(root_attrs)[0] + pwd_properties = PwdProperties(root_attrs["attributes"]["pwdProperties"]) + if PwdProperties.DOMAIN_PASSWORD_COMPLEX in pwd_properties: + return True + + return False + + def change_password(self, user: User, password: str): + """Change user's password""" + user_dn = user.attributes.get("distinguishedName", None) + if not user_dn: + raise AttributeError("User has no distinguishedName set.") + self._source.connection.extend.microsoft.modify_password(user_dn, password) + + def _ad_check_password_existing(self, password: str, user_dn: str) -> bool: + """Check if a password contains sAMAccount or displayName""" + users = list( + self._source.connection.extend.standard.paged_search( + search_base=user_dn, + search_filter=self._source.user_object_filter, + search_scope=ldap3.BASE, + attributes=["displayName", "sAMAccountName"], + ) + ) + if len(users) != 1: + raise AssertionError() + user_attributes = users[0]["attributes"] + # If sAMAccountName is longer than 3 chars, check if its contained in password + if len(user_attributes["sAMAccountName"]) >= 3: + if password.lower() in user_attributes["sAMAccountName"].lower(): + return False + display_name_tokens = split( + RE_DISPLAYNAME_SEPARATORS, user_attributes["displayName"] + ) + for token in display_name_tokens: + # Ignore tokens under 3 chars + if len(token) < 3: + continue + if token.lower() in password.lower(): + return False + return True + + def ad_password_complexity( + self, password: str, user: Optional[User] = None + ) -> bool: + """Check if password matches Active direcotry password policies + + https://docs.microsoft.com/en-us/windows/security/threat-protection/ + security-policy-settings/password-must-meet-complexity-requirements + """ + if user: + # Check if password contains sAMAccountName or displayNames + if "distinguishedName" in user.attributes: + existing_user_check = self._ad_check_password_existing( + password, user.attributes.get("distinguishedName") + ) + if not existing_user_check: + LOGGER.debug("Password failed name check", user=user) + return existing_user_check + + # Step 2, match at least 3 of 5 categories + matched_categories = PasswordCategories.NONE + required = 3 + for letter in password: + # Only match one category per letter, + if letter.islower(): + matched_categories |= PasswordCategories.ALPHA_LOWER + elif letter.isupper(): + matched_categories |= PasswordCategories.ALPHA_UPPER + elif not letter.isascii() and letter.isalpha(): + # Not exactly matching microsoft's policy, but count it as "Other unicode" char + # when its alpha and not ascii + matched_categories |= PasswordCategories.ALPHA_OTHER + elif letter.isnumeric(): + matched_categories |= PasswordCategories.NUMERIC + elif letter in NON_ALPHA: + matched_categories |= PasswordCategories.SYMBOL + if bin(matched_categories).count("1") < required: + LOGGER.debug( + "Password didn't match enough categories", + has=matched_categories, + must=required, + ) + return False + LOGGER.debug( + "Password matched categories", has=matched_categories, must=required + ) + return True diff --git a/passbook/sources/ldap/signals.py b/passbook/sources/ldap/signals.py index 4c5e0e1db..3fc8ef12b 100644 --- a/passbook/sources/ldap/signals.py +++ b/passbook/sources/ldap/signals.py @@ -10,8 +10,8 @@ from ldap3.core.exceptions import LDAPException from passbook.core.models import User from passbook.core.signals import password_changed from passbook.flows.planner import PLAN_CONTEXT_PENDING_USER -from passbook.sources.ldap.connector import Connector from passbook.sources.ldap.models import LDAPSource +from passbook.sources.ldap.password import LDAPPasswordChanger from passbook.sources.ldap.tasks import sync_single from passbook.stages.prompt.signals import password_validate @@ -32,9 +32,9 @@ def ldap_password_validate(sender, password: str, plan_context: Dict[str, Any], if not sources.exists(): return source = sources.first() - connector = Connector(source) - if connector.check_ad_password_complexity_enabled(): - passing = connector.ad_password_complexity( + changer = LDAPPasswordChanger(source) + if changer.check_ad_password_complexity_enabled(): + passing = changer.ad_password_complexity( password, plan_context.get(PLAN_CONTEXT_PENDING_USER, None) ) if not passing: @@ -52,8 +52,8 @@ def ldap_sync_password(sender, user: User, password: str, **_): if not sources.exists(): return source = sources.first() - connector = Connector(source) + changer = LDAPPasswordChanger(source) try: - connector.change_password(user, password) + changer.change_password(user, password) except LDAPException as exc: raise ValidationError("Failed to set password") from exc diff --git a/passbook/sources/ldap/connector.py b/passbook/sources/ldap/sync.py similarity index 51% rename from passbook/sources/ldap/connector.py rename to passbook/sources/ldap/sync.py index fc403890b..0d0f024da 100644 --- a/passbook/sources/ldap/connector.py +++ b/passbook/sources/ldap/sync.py @@ -1,7 +1,5 @@ -"""Wrapper for ldap3 to easily manage user""" -from enum import IntFlag -from re import split -from typing import Any, Dict, Optional +"""Sync LDAP Users and groups into passbook""" +from typing import Any, Dict import ldap3 import ldap3.core.exceptions @@ -14,23 +12,9 @@ from passbook.sources.ldap.models import LDAPPropertyMapping, LDAPSource LOGGER = get_logger() -NON_ALPHA = r"~!@#$%^&*_-+=`|\(){}[]:;\"'<>,.?/" -RE_DISPLAYNAME_SEPARATORS = r",\.–—_\s#\t" - -class PwdProperties(IntFlag): - """Possible values for the pwdProperties attribute""" - - DOMAIN_PASSWORD_COMPLEX = 1 - DOMAIN_PASSWORD_NO_ANON_CHANGE = 2 - DOMAIN_PASSWORD_NO_CLEAR_CHANGE = 4 - DOMAIN_LOCKOUT_ADMINS = 8 - DOMAIN_PASSWORD_STORE_CLEARTEXT = 16 - DOMAIN_REFUSE_PASSWORD_CHANGE = 32 - - -class Connector: - """Wrapper for ldap3 to easily manage user authentication and creation""" +class LDAPSynchronizer: + """Sync LDAP Users and groups into passbook""" _source: LDAPSource @@ -198,151 +182,3 @@ class Connector: "distinguishedName" ) return properties - - def auth_user(self, password: str, **filters: str) -> Optional[User]: - """Try to bind as either user_dn or mail with password. - Returns True on success, otherwise False""" - users = User.objects.filter(**filters) - if not users.exists(): - return None - user: User = users.first() - if "distinguishedName" not in user.attributes: - LOGGER.debug( - "User doesn't have DN set, assuming not LDAP imported.", user=user - ) - return None - # Either has unusable password, - # or has a password, but couldn't be authenticated by ModelBackend. - # This means we check with a bind to see if the LDAP password has changed - if self.auth_user_by_bind(user, password): - # Password given successfully binds to LDAP, so we save it in our Database - LOGGER.debug("Updating user's password in DB", user=user) - user.set_password(password, signal=False) - user.save() - return user - # Password doesn't match - LOGGER.debug("Failed to bind, password invalid") - return None - - def auth_user_by_bind(self, user: User, password: str) -> Optional[User]: - """Attempt authentication by binding to the LDAP server as `user`. This - method should be avoided as its slow to do the bind.""" - # Try to bind as new user - LOGGER.debug("Attempting Binding as user", user=user) - try: - temp_connection = ldap3.Connection( - self._source.connection.server, - user=user.attributes.get("distinguishedName"), - password=password, - raise_exceptions=True, - ) - temp_connection.bind() - return user - except ldap3.core.exceptions.LDAPInvalidCredentialsResult as exception: - LOGGER.debug("LDAPInvalidCredentialsResult", user=user, error=exception) - except ldap3.core.exceptions.LDAPException as exception: - LOGGER.warning(exception) - return None - - def get_domain_root_dn(self) -> str: - """Attempt to get root DN via MS specific fields or generic LDAP fields""" - info = self._source.connection.server.info - if "rootDomainNamingContext" in info.other: - return info.other["rootDomainNamingContext"][0] - naming_contexts = info.naming_contexts - naming_contexts.sort(key=len) - return naming_contexts[0] - - def check_ad_password_complexity_enabled(self) -> bool: - """Check if DOMAIN_PASSWORD_COMPLEX is enabled""" - root_dn = self.get_domain_root_dn() - root_attrs = self._source.connection.extend.standard.paged_search( - search_base=root_dn, - search_filter="(objectClass=*)", - search_scope=ldap3.BASE, - attributes=["pwdProperties"], - ) - root_attrs = list(root_attrs)[0] - pwd_properties = PwdProperties(root_attrs["attributes"]["pwdProperties"]) - if PwdProperties.DOMAIN_PASSWORD_COMPLEX in pwd_properties: - return True - - return False - - def change_password(self, user: User, password: str): - """Change user's password""" - user_dn = user.attributes.get("distinguishedName", None) - if not user_dn: - raise AttributeError("User has no distinguishedName set.") - self._source.connection.extend.microsoft.modify_password(user_dn, password) - - def _ad_check_password_existing(self, password: str, user_dn: str) -> bool: - """Check if a password contains sAMAccount or displayName""" - users = self._source.connection.extend.standard.paged_search( - search_base=user_dn, - search_filter="(objectClass=*)", - search_scope=ldap3.BASE, - attributes=["displayName", "sAMAccountName"], - ) - if len(users) != 1: - raise AssertionError() - user = users[0] - # If sAMAccountName is longer than 3 chars, check if its contained in password - if len(user.sAMAccountName.value) >= 3: - if password.lower() in user.sAMAccountName.value.lower(): - return False - display_name_tokens = split(RE_DISPLAYNAME_SEPARATORS, user.displayName.value) - for token in display_name_tokens: - # Ignore tokens under 3 chars - if len(token) < 3: - continue - if token.lower() in password.lower(): - return False - return True - - def ad_password_complexity( - self, password: str, user: Optional[User] = None - ) -> bool: - """Check if password matches Active direcotry password policies - - https://docs.microsoft.com/en-us/windows/security/threat-protection/ - security-policy-settings/password-must-meet-complexity-requirements - """ - if user: - # Check if password contains sAMAccountName or displayNames - if "distinguishedName" in user.attributes: - existing_user_check = self._ad_check_password_existing( - password, user.attributes.get("distinguishedName") - ) - if not existing_user_check: - LOGGER.debug("Password failed name check", user=user) - return existing_user_check - - # Step 2, match at least 3 of 5 categories - matched_categories = 0 - required = 3 - for letter in password: - # Only match one category per letter, - if letter.islower(): - matched_categories += 1 - elif letter.isupper(): - matched_categories += 1 - elif not letter.isascii() and letter.isalpha(): - # Not exactly matching microsoft's policy, but count it as "Other unicode" char - # when its alpha and not ascii - matched_categories += 1 - elif letter.isnumeric(): - matched_categories += 1 - elif letter in NON_ALPHA: - matched_categories += 1 - if matched_categories < required: - LOGGER.debug( - "Password didn't match enough categories", - has=matched_categories, - must=required, - ) - return False - LOGGER.debug( - "Password matched categories", has=matched_categories, must=required - ) - return True diff --git a/passbook/sources/ldap/tasks.py b/passbook/sources/ldap/tasks.py index 69a18aed3..0e5b257f0 100644 --- a/passbook/sources/ldap/tasks.py +++ b/passbook/sources/ldap/tasks.py @@ -4,8 +4,8 @@ from time import time from django.core.cache import cache from passbook.root.celery import CELERY_APP -from passbook.sources.ldap.connector import Connector from passbook.sources.ldap.models import LDAPSource +from passbook.sources.ldap.sync import LDAPSynchronizer @CELERY_APP.task() @@ -19,9 +19,9 @@ def sync(): def sync_single(source_pk): """Sync a single source""" source: LDAPSource = LDAPSource.objects.get(pk=source_pk) - connector = Connector(source) - connector.sync_users() - connector.sync_groups() - connector.sync_membership() + syncer = LDAPSynchronizer(source) + syncer.sync_users() + syncer.sync_groups() + syncer.sync_membership() cache_key = source.state_cache_prefix("last_sync") cache.set(cache_key, time(), timeout=60 * 60) diff --git a/passbook/sources/ldap/tests.py b/passbook/sources/ldap/tests.py deleted file mode 100644 index 2cc29470d..000000000 --- a/passbook/sources/ldap/tests.py +++ /dev/null @@ -1,149 +0,0 @@ -"""LDAP Source tests""" -from unittest.mock import Mock, PropertyMock, patch - -from django.test import TestCase -from ldap3 import MOCK_SYNC, OFFLINE_AD_2012_R2, Connection, Server - -from passbook.core.models import Group, User -from passbook.providers.oauth2.generators import generate_client_secret -from passbook.sources.ldap.auth import LDAPBackend -from passbook.sources.ldap.connector import Connector -from passbook.sources.ldap.models import LDAPPropertyMapping, LDAPSource -from passbook.sources.ldap.tasks import sync - - -def _build_mock_connection() -> Connection: - """Create mock connection""" - server = Server("my_fake_server", get_info=OFFLINE_AD_2012_R2) - _pass = "foo" # noqa # nosec - connection = Connection( - server, - user="cn=my_user,ou=test,o=lab", - password=_pass, - client_strategy=MOCK_SYNC, - ) - connection.strategy.add_entry( - "cn=group1,ou=groups,ou=test,o=lab", - { - "name": "test-group", - "objectSid": "unique-test-group", - "objectCategory": "Group", - "distinguishedName": "cn=group1,ou=groups,ou=test,o=lab", - }, - ) - # Group without SID - connection.strategy.add_entry( - "cn=group2,ou=groups,ou=test,o=lab", - { - "name": "test-group", - "objectCategory": "Group", - "distinguishedName": "cn=group2,ou=groups,ou=test,o=lab", - }, - ) - connection.strategy.add_entry( - "cn=user0,ou=users,ou=test,o=lab", - { - "userPassword": LDAP_PASSWORD, - "sAMAccountName": "user0_sn", - "name": "user0_sn", - "revision": 0, - "objectSid": "user0", - "objectCategory": "Person", - "memberOf": "cn=group1,ou=groups,ou=test,o=lab", - }, - ) - # User without SID - connection.strategy.add_entry( - "cn=user1,ou=users,ou=test,o=lab", - { - "userPassword": "test1111", - "sAMAccountName": "user2_sn", - "name": "user1_sn", - "revision": 0, - "objectCategory": "Person", - }, - ) - # Duplicate users - connection.strategy.add_entry( - "cn=user2,ou=users,ou=test,o=lab", - { - "userPassword": "test2222", - "sAMAccountName": "user2_sn", - "name": "user2_sn", - "revision": 0, - "objectSid": "unique-test2222", - "objectCategory": "Person", - }, - ) - connection.strategy.add_entry( - "cn=user3,ou=users,ou=test,o=lab", - { - "userPassword": "test2222", - "sAMAccountName": "user2_sn", - "name": "user2_sn", - "revision": 0, - "objectSid": "unique-test2222", - "objectCategory": "Person", - }, - ) - connection.bind() - return connection - - -LDAP_PASSWORD = generate_client_secret() -LDAP_CONNECTION_PATCH = PropertyMock(return_value=_build_mock_connection()) - - -class LDAPSourceTests(TestCase): - """LDAP Source tests""" - - def setUp(self): - self.source = LDAPSource.objects.create( - name="ldap", - slug="ldap", - base_dn="ou=test,o=lab", - additional_user_dn="ou=users", - additional_group_dn="ou=groups", - ) - self.source.property_mappings.set(LDAPPropertyMapping.objects.all()) - self.source.save() - - @patch("passbook.sources.ldap.models.LDAPSource.connection", LDAP_CONNECTION_PATCH) - def test_sync_users(self): - """Test user sync""" - connector = Connector(self.source) - connector.sync_users() - self.assertTrue(User.objects.filter(username="user0_sn").exists()) - self.assertFalse(User.objects.filter(username="user1_sn").exists()) - - @patch("passbook.sources.ldap.models.LDAPSource.connection", LDAP_CONNECTION_PATCH) - def test_sync_groups(self): - """Test group sync""" - connector = Connector(self.source) - connector.sync_groups() - connector.sync_membership() - group = Group.objects.filter(name="test-group") - self.assertTrue(group.exists()) - - @patch("passbook.sources.ldap.models.LDAPSource.connection", LDAP_CONNECTION_PATCH) - def test_auth(self): - """Test Cached auth""" - connector = Connector(self.source) - connector.sync_users() - - user = User.objects.get(username="user0_sn") - auth_user_by_bind = Mock(return_value=user) - with patch( - "passbook.sources.ldap.connector.Connector.auth_user_by_bind", - auth_user_by_bind, - ): - backend = LDAPBackend() - self.assertEqual( - backend.authenticate(None, username="user0_sn", password=LDAP_PASSWORD), - user, - ) - - @patch("passbook.sources.ldap.models.LDAPSource.connection", LDAP_CONNECTION_PATCH) - def test_tasks(self): - """Test Scheduled tasks""" - sync() diff --git a/passbook/sources/ldap/tests/__init__.py b/passbook/sources/ldap/tests/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/passbook/sources/ldap/tests/test_auth.py b/passbook/sources/ldap/tests/test_auth.py new file mode 100644 index 000000000..09eb30810 --- /dev/null +++ b/passbook/sources/ldap/tests/test_auth.py @@ -0,0 +1,47 @@ +"""LDAP Source tests""" +from unittest.mock import Mock, PropertyMock, patch + +from django.test import TestCase + +from passbook.core.models import User +from passbook.providers.oauth2.generators import generate_client_secret +from passbook.sources.ldap.auth import LDAPBackend +from passbook.sources.ldap.models import LDAPPropertyMapping, LDAPSource +from passbook.sources.ldap.sync import LDAPSynchronizer +from passbook.sources.ldap.tests.utils import _build_mock_connection + +LDAP_PASSWORD = generate_client_secret() +LDAP_CONNECTION_PATCH = PropertyMock(return_value=_build_mock_connection(LDAP_PASSWORD)) + + +class LDAPSyncTests(TestCase): + """LDAP Sync tests""" + + def setUp(self): + self.source = LDAPSource.objects.create( + name="ldap", + slug="ldap", + base_dn="DC=AD2012,DC=LAB", + additional_user_dn="ou=users", + additional_group_dn="ou=groups", + ) + self.source.property_mappings.set(LDAPPropertyMapping.objects.all()) + self.source.save() + + @patch("passbook.sources.ldap.models.LDAPSource.connection", LDAP_CONNECTION_PATCH) + def test_auth_synced_user(self): + """Test Cached auth""" + syncer = LDAPSynchronizer(self.source) + syncer.sync_users() + + user = User.objects.get(username="user0_sn") + auth_user_by_bind = Mock(return_value=user) + with patch( + "passbook.sources.ldap.auth.LDAPBackend.auth_user_by_bind", + auth_user_by_bind, + ): + backend = LDAPBackend() + self.assertEqual( + backend.authenticate(None, username="user0_sn", password=LDAP_PASSWORD), + user, + ) diff --git a/passbook/sources/ldap/tests/test_password.py b/passbook/sources/ldap/tests/test_password.py new file mode 100644 index 000000000..6581c83dc --- /dev/null +++ b/passbook/sources/ldap/tests/test_password.py @@ -0,0 +1,54 @@ +"""LDAP Source tests""" +from unittest.mock import PropertyMock, patch + +from django.test import TestCase + +from passbook.core.models import User +from passbook.providers.oauth2.generators import generate_client_secret +from passbook.sources.ldap.models import LDAPPropertyMapping, LDAPSource +from passbook.sources.ldap.password import LDAPPasswordChanger +from passbook.sources.ldap.tests.utils import _build_mock_connection + +LDAP_PASSWORD = generate_client_secret() +LDAP_CONNECTION_PATCH = PropertyMock(return_value=_build_mock_connection(LDAP_PASSWORD)) + + +class LDAPPasswordTests(TestCase): + """LDAP Password tests""" + + def setUp(self): + self.source = LDAPSource.objects.create( + name="ldap", + slug="ldap", + base_dn="DC=AD2012,DC=LAB", + additional_user_dn="ou=users", + additional_group_dn="ou=groups", + ) + self.source.property_mappings.set(LDAPPropertyMapping.objects.all()) + self.source.save() + + @patch("passbook.sources.ldap.models.LDAPSource.connection", LDAP_CONNECTION_PATCH) + def test_password_complexity(self): + """Test password without user""" + pwc = LDAPPasswordChanger(self.source) + self.assertFalse(pwc.ad_password_complexity("test")) # 1 category + self.assertFalse(pwc.ad_password_complexity("test1")) # 2 categories + self.assertTrue(pwc.ad_password_complexity("test1!")) # 2 categories + + @patch("passbook.sources.ldap.models.LDAPSource.connection", LDAP_CONNECTION_PATCH) + def test_password_complexity_user(self): + """test password with user""" + pwc = LDAPPasswordChanger(self.source) + user = User.objects.create( + username="test", + attributes={"distinguishedName": "cn=user,ou=users,DC=AD2012,DC=LAB"}, + ) + self.assertFalse(pwc.ad_password_complexity("test", user)) # 1 category + self.assertFalse(pwc.ad_password_complexity("test1", user)) # 2 categories + self.assertTrue(pwc.ad_password_complexity("test1!", user)) # 2 categories + self.assertFalse( + pwc.ad_password_complexity("erin!qewrqewr", user) + ) # displayName token + self.assertFalse( + pwc.ad_password_complexity("hagens!qewrqewr", user) + ) # displayName token diff --git a/passbook/sources/ldap/tests/test_sync.py b/passbook/sources/ldap/tests/test_sync.py new file mode 100644 index 000000000..e2b7e1f98 --- /dev/null +++ b/passbook/sources/ldap/tests/test_sync.py @@ -0,0 +1,51 @@ +"""LDAP Source tests""" +from unittest.mock import PropertyMock, patch + +from django.test import TestCase + +from passbook.core.models import Group, User +from passbook.providers.oauth2.generators import generate_client_secret +from passbook.sources.ldap.models import LDAPPropertyMapping, LDAPSource +from passbook.sources.ldap.sync import LDAPSynchronizer +from passbook.sources.ldap.tasks import sync +from passbook.sources.ldap.tests.utils import _build_mock_connection + +LDAP_PASSWORD = generate_client_secret() +LDAP_CONNECTION_PATCH = PropertyMock(return_value=_build_mock_connection(LDAP_PASSWORD)) + + +class LDAPSyncTests(TestCase): + """LDAP Sync tests""" + + def setUp(self): + self.source = LDAPSource.objects.create( + name="ldap", + slug="ldap", + base_dn="DC=AD2012,DC=LAB", + additional_user_dn="ou=users", + additional_group_dn="ou=groups", + ) + self.source.property_mappings.set(LDAPPropertyMapping.objects.all()) + self.source.save() + + @patch("passbook.sources.ldap.models.LDAPSource.connection", LDAP_CONNECTION_PATCH) + def test_sync_users(self): + """Test user sync""" + syncer = LDAPSynchronizer(self.source) + syncer.sync_users() + self.assertTrue(User.objects.filter(username="user0_sn").exists()) + self.assertFalse(User.objects.filter(username="user1_sn").exists()) + + @patch("passbook.sources.ldap.models.LDAPSource.connection", LDAP_CONNECTION_PATCH) + def test_sync_groups(self): + """Test group sync""" + syncer = LDAPSynchronizer(self.source) + syncer.sync_groups() + syncer.sync_membership() + group = Group.objects.filter(name="test-group") + self.assertTrue(group.exists()) + + @patch("passbook.sources.ldap.models.LDAPSource.connection", LDAP_CONNECTION_PATCH) + def test_tasks(self): + """Test Scheduled tasks""" + sync() diff --git a/passbook/sources/ldap/tests/utils.py b/passbook/sources/ldap/tests/utils.py new file mode 100644 index 000000000..8cd129645 --- /dev/null +++ b/passbook/sources/ldap/tests/utils.py @@ -0,0 +1,93 @@ +"""ldap testing utils""" + +from ldap3 import MOCK_SYNC, OFFLINE_AD_2012_R2, Connection, Server + + +def _build_mock_connection(password: str) -> Connection: + """Create mock connection""" + server = Server("my_fake_server", get_info=OFFLINE_AD_2012_R2) + _pass = "foo" # noqa # nosec + connection = Connection( + server, + user="cn=my_user,DC=AD2012,DC=LAB", + password=_pass, + client_strategy=MOCK_SYNC, + ) + # Entry for password checking + connection.strategy.add_entry( + "cn=user,ou=users,DC=AD2012,DC=LAB", + { + "name": "test-user", + "objectSid": "unique-test-group", + "objectCategory": "Person", + "displayName": "Erin M. Hagens", + "sAMAccountName": "sAMAccountName", + "distinguishedName": "cn=user,ou=users,DC=AD2012,DC=LAB", + }, + ) + connection.strategy.add_entry( + "cn=group1,ou=groups,DC=AD2012,DC=LAB", + { + "name": "test-group", + "objectSid": "unique-test-group", + "objectCategory": "Group", + "distinguishedName": "cn=group1,ou=groups,DC=AD2012,DC=LAB", + }, + ) + # Group without SID + connection.strategy.add_entry( + "cn=group2,ou=groups,DC=AD2012,DC=LAB", + { + "name": "test-group", + "objectCategory": "Group", + "distinguishedName": "cn=group2,ou=groups,DC=AD2012,DC=LAB", + }, + ) + connection.strategy.add_entry( + "cn=user0,ou=users,DC=AD2012,DC=LAB", + { + "userPassword": password, + "sAMAccountName": "user0_sn", + "name": "user0_sn", + "revision": 0, + "objectSid": "user0", + "objectCategory": "Person", + "memberOf": "cn=group1,ou=groups,DC=AD2012,DC=LAB", + }, + ) + # User without SID + connection.strategy.add_entry( + "cn=user1,ou=users,DC=AD2012,DC=LAB", + { + "userPassword": "test1111", + "sAMAccountName": "user2_sn", + "name": "user1_sn", + "revision": 0, + "objectCategory": "Person", + }, + ) + # Duplicate users + connection.strategy.add_entry( + "cn=user2,ou=users,DC=AD2012,DC=LAB", + { + "userPassword": "test2222", + "sAMAccountName": "user2_sn", + "name": "user2_sn", + "revision": 0, + "objectSid": "unique-test2222", + "objectCategory": "Person", + }, + ) + connection.strategy.add_entry( + "cn=user3,ou=users,DC=AD2012,DC=LAB", + { + "userPassword": "test2222", + "sAMAccountName": "user2_sn", + "name": "user2_sn", + "revision": 0, + "objectSid": "unique-test2222", + "objectCategory": "Person", + }, + ) + connection.bind() + return connection diff --git a/swagger.yaml b/swagger.yaml index 3c7be2956..d5a317a40 100755 --- a/swagger.yaml +++ b/swagger.yaml @@ -5836,18 +5836,22 @@ definitions: title: Action type: string enum: - - LOGIN - - LOGIN_FAILED - - LOGOUT - - AUTHORIZE_APPLICATION - - SUSPICIOUS_REQUEST - - SIGN_UP - - PASSWORD_RESET - - INVITE_CREATED - - INVITE_USED - - IMPERSONATION_STARTED - - IMPERSONATION_ENDED - - CUSTOM + - login + - login_failed + - logout + - sign_up + - authorize_application + - suspicious_request + - password_set + - invitation_created + - invitation_used + - source_linked + - impersonation_started + - impersonation_ended + - model_created + - model_updated + - model_deleted + - custom_ date: title: Date type: string