From 000244e387881db86bac051b37d7eb912b541298 Mon Sep 17 00:00:00 2001 From: Jens L Date: Mon, 18 Sep 2023 21:38:01 +0200 Subject: [PATCH] sources/ldap: add lock to sync (#6930) --- authentik/sources/ldap/tasks.py | 36 ++++++++++++++++++++++----------- 1 file changed, 24 insertions(+), 12 deletions(-) diff --git a/authentik/sources/ldap/tasks.py b/authentik/sources/ldap/tasks.py index 0223a68ba..026f398b6 100644 --- a/authentik/sources/ldap/tasks.py +++ b/authentik/sources/ldap/tasks.py @@ -4,6 +4,8 @@ from uuid import uuid4 from celery import chain, group from django.core.cache import cache from ldap3.core.exceptions import LDAPException +from redis.exceptions import LockError +from redis.lock import Lock from structlog.stdlib import get_logger from authentik.events.monitored_tasks import MonitoredTask, TaskResult, TaskResultStatus @@ -45,18 +47,28 @@ def ldap_sync_single(source_pk: str): source: LDAPSource = LDAPSource.objects.filter(pk=source_pk).first() if not source: return - task = chain( - # User and group sync can happen at once, they have no dependencies on each other - group( - ldap_sync_paginator(source, UserLDAPSynchronizer) - + ldap_sync_paginator(source, GroupLDAPSynchronizer), - ), - # Membership sync needs to run afterwards - group( - ldap_sync_paginator(source, MembershipLDAPSynchronizer), - ), - ) - task() + lock = Lock(cache.client.get_client(), name=f"goauthentik.io/sources/ldap/sync-{source.slug}") + if lock.locked(): + LOGGER.debug("LDAP sync locked, skipping task", source=source.slug) + return + try: + with lock: + task = chain( + # User and group sync can happen at once, they have no dependencies on each other + group( + ldap_sync_paginator(source, UserLDAPSynchronizer) + + ldap_sync_paginator(source, GroupLDAPSynchronizer), + ), + # Membership sync needs to run afterwards + group( + ldap_sync_paginator(source, MembershipLDAPSynchronizer), + ), + ) + task() + except LockError: + # This should never happen, we check if the lock is locked above so this + # would only happen if there was some other timeout + LOGGER.debug("Failed to acquire lock for LDAP sync", source=source.slug) def ldap_sync_paginator(source: LDAPSource, sync: type[BaseLDAPSynchronizer]) -> list: