From c2f8ff55cf4e6749529fdfc247e879ffdabe4e18 Mon Sep 17 00:00:00 2001 From: Jens Langhammer Date: Mon, 10 May 2021 17:11:31 +0200 Subject: [PATCH] outposts: fix outpost delete hanging thread, run cleanup in async task with info from cache with ability to retry Signed-off-by: Jens Langhammer --- authentik/outposts/controllers/base.py | 6 ++++ authentik/outposts/controllers/kubernetes.py | 12 +++++++ authentik/outposts/signals.py | 34 ++++++------------ authentik/outposts/tasks.py | 38 +++++++++----------- swagger.yaml | 5 --- 5 files changed, 45 insertions(+), 50 deletions(-) diff --git a/authentik/outposts/controllers/base.py b/authentik/outposts/controllers/base.py index 4777a61d0..ef22b79d5 100644 --- a/authentik/outposts/controllers/base.py +++ b/authentik/outposts/controllers/base.py @@ -56,6 +56,12 @@ class BaseController: """Handler to delete everything we've created""" raise NotImplementedError + def down_with_logs(self) -> list[str]: + """Call .down() but capture all log output and return it.""" + with capture_logs() as logs: + self.down() + return [x["event"] for x in logs] + def get_static_deployment(self) -> str: """Return a static deployment configuration""" raise NotImplementedError diff --git a/authentik/outposts/controllers/kubernetes.py b/authentik/outposts/controllers/kubernetes.py index 09888ee81..b13cc5315 100644 --- a/authentik/outposts/controllers/kubernetes.py +++ b/authentik/outposts/controllers/kubernetes.py @@ -67,6 +67,18 @@ class KubernetesController(BaseController): except ApiException as exc: raise ControllerException(str(exc)) from exc + def down_with_logs(self) -> list[str]: + try: + all_logs = [] + for reconcile_key in self.reconcile_order: + with capture_logs() as logs: + reconciler = self.reconcilers[reconcile_key](self) + reconciler.down() + all_logs += [f"{reconcile_key.title()}: {x['event']}" for x in logs] + return all_logs + except ApiException as exc: + raise ControllerException(str(exc)) from exc + def get_static_deployment(self) -> str: documents = [] for reconcile_key in self.reconcile_order: diff --git a/authentik/outposts/signals.py b/authentik/outposts/signals.py index edf85dc0a..85667cf37 100644 --- a/authentik/outposts/signals.py +++ b/authentik/outposts/signals.py @@ -1,5 +1,5 @@ """authentik outpost signals""" -from django.conf import settings +from django.core.cache import cache from django.db.models import Model from django.db.models.signals import post_save, pre_delete, pre_save from django.dispatch import receiver @@ -8,9 +8,12 @@ from structlog.stdlib import get_logger from authentik.core.models import Provider from authentik.crypto.models import CertificateKeyPair from authentik.lib.utils.reflection import class_to_path -from authentik.outposts.controllers.base import ControllerException from authentik.outposts.models import Outpost, OutpostServiceConnection -from authentik.outposts.tasks import outpost_controller_down, outpost_post_save +from authentik.outposts.tasks import ( + CACHE_KEY_OUTPOST_DOWN, + outpost_controller, + outpost_post_save, +) LOGGER = get_logger() UPDATE_TRIGGERING_MODELS = ( @@ -39,7 +42,8 @@ def pre_save_outpost(sender, instance: Outpost, **_): ) if bool(dirty): LOGGER.info("Outpost needs re-deployment due to changes", instance=instance) - outpost_controller_down_wrapper(old_instance) + cache.set(CACHE_KEY_OUTPOST_DOWN % instance.pk.hex, old_instance) + outpost_controller.delay(instance.pk.hex, action="down", from_cache=True) @receiver(post_save) @@ -63,23 +67,5 @@ def post_save_update(sender, instance: Model, **_): def pre_delete_cleanup(sender, instance: Outpost, **_): """Ensure that Outpost's user is deleted (which will delete the token through cascade)""" instance.user.delete() - outpost_controller_down_wrapper(instance) - - -def outpost_controller_down_wrapper(instance: Outpost): - """To ensure that deployment is cleaned up *consistently* we call the controller, and wait - for it to finish. We don't want to call it in this thread, as we don't have the Outpost - Service connection here""" - try: - outpost_controller_down.delay(instance.pk.hex).get() - except RuntimeError: # pragma: no cover - # In e2e/integration tests, this might run inside a thread/process and - # trigger the celery `Never call result.get() within a task` detection - if settings.TEST: - pass - else: - raise - except ControllerException as exc: - LOGGER.warning( - "failed to cleanup outpost deployment", exc=exc, instance=instance - ) + cache.set(CACHE_KEY_OUTPOST_DOWN % instance.pk.hex, instance) + outpost_controller.delay(instance.pk.hex, action="down", from_cache=True) diff --git a/authentik/outposts/tasks.py b/authentik/outposts/tasks.py index 2c247fc83..982acfbee 100644 --- a/authentik/outposts/tasks.py +++ b/authentik/outposts/tasks.py @@ -36,6 +36,7 @@ from authentik.providers.proxy.controllers.kubernetes import ProxyKubernetesCont from authentik.root.celery import CELERY_APP LOGGER = get_logger() +CACHE_KEY_OUTPOST_DOWN = "outpost_teardown_%s" def controller_for_outpost(outpost: Outpost) -> Optional[BaseController]: @@ -56,13 +57,6 @@ def controller_for_outpost(outpost: Outpost) -> Optional[BaseController]: return None -@CELERY_APP.task() -def outpost_controller_all(): - """Launch Controller for all Outposts which support it""" - for outpost in Outpost.objects.exclude(service_connection=None): - outpost_controller.delay(outpost.pk.hex) - - @CELERY_APP.task() def outpost_service_connection_state(connection_pk: Any): """Update cached state of a service connection""" @@ -89,17 +83,29 @@ def outpost_service_connection_monitor(self: MonitoredTask): ) +@CELERY_APP.task() +def outpost_controller_all(): + """Launch Controller for all Outposts which support it""" + for outpost in Outpost.objects.exclude(service_connection=None): + outpost_controller.delay(outpost.pk.hex, "up", from_cache=False) + + @CELERY_APP.task(bind=True, base=MonitoredTask) -def outpost_controller(self: MonitoredTask, outpost_pk: str): - """Create/update/monitor the deployment of an Outpost""" +def outpost_controller( + self: MonitoredTask, outpost_pk: str, action: str = "up", from_cache: bool = False +): + """Create/update/monitor/delete the deployment of an Outpost""" logs = [] - outpost: Outpost = Outpost.objects.get(pk=outpost_pk) + if from_cache: + outpost: Outpost = cache.get(CACHE_KEY_OUTPOST_DOWN % outpost_pk) + else: + outpost: Outpost = Outpost.objects.get(pk=outpost_pk) self.set_uid(slugify(outpost.name)) try: controller = controller_for_outpost(outpost) if not controller: return - logs = controller.up_with_logs() + logs = getattr(controller, f"{action}_with_logs")() LOGGER.debug("---------------Outpost Controller logs starting----------------") for log in logs: LOGGER.debug(log) @@ -110,16 +116,6 @@ def outpost_controller(self: MonitoredTask, outpost_pk: str): self.set_status(TaskResult(TaskResultStatus.SUCCESSFUL, logs)) -@CELERY_APP.task() -def outpost_controller_down(outpost_pk: str): - """Delete outpost objects before deleting the DB Object""" - outpost = Outpost.objects.get(pk=outpost_pk) - controller = controller_for_outpost(outpost) - if not controller: - return - controller.down() - - @CELERY_APP.task(bind=True, base=MonitoredTask) def outpost_token_ensurer(self: MonitoredTask): """Periodically ensure that all Outposts have valid Service Accounts diff --git a/swagger.yaml b/swagger.yaml index aa676bfa8..edf6a34d3 100755 --- a/swagger.yaml +++ b/swagger.yaml @@ -531,11 +531,6 @@ paths: description: '' required: false type: string - - name: ordering - in: query - description: Which field to use when ordering the results. - required: false - type: string - name: search in: query description: A search term.