outposts: fix outpost delete hanging thread, run cleanup in async task with info from cache with ability to retry
Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org>
This commit is contained in:
parent
4b52697cfe
commit
c2f8ff55cf
|
@ -56,6 +56,12 @@ class BaseController:
|
||||||
"""Handler to delete everything we've created"""
|
"""Handler to delete everything we've created"""
|
||||||
raise NotImplementedError
|
raise NotImplementedError
|
||||||
|
|
||||||
|
def down_with_logs(self) -> list[str]:
|
||||||
|
"""Call .down() but capture all log output and return it."""
|
||||||
|
with capture_logs() as logs:
|
||||||
|
self.down()
|
||||||
|
return [x["event"] for x in logs]
|
||||||
|
|
||||||
def get_static_deployment(self) -> str:
|
def get_static_deployment(self) -> str:
|
||||||
"""Return a static deployment configuration"""
|
"""Return a static deployment configuration"""
|
||||||
raise NotImplementedError
|
raise NotImplementedError
|
||||||
|
|
|
@ -67,6 +67,18 @@ class KubernetesController(BaseController):
|
||||||
except ApiException as exc:
|
except ApiException as exc:
|
||||||
raise ControllerException(str(exc)) from exc
|
raise ControllerException(str(exc)) from exc
|
||||||
|
|
||||||
|
def down_with_logs(self) -> list[str]:
|
||||||
|
try:
|
||||||
|
all_logs = []
|
||||||
|
for reconcile_key in self.reconcile_order:
|
||||||
|
with capture_logs() as logs:
|
||||||
|
reconciler = self.reconcilers[reconcile_key](self)
|
||||||
|
reconciler.down()
|
||||||
|
all_logs += [f"{reconcile_key.title()}: {x['event']}" for x in logs]
|
||||||
|
return all_logs
|
||||||
|
except ApiException as exc:
|
||||||
|
raise ControllerException(str(exc)) from exc
|
||||||
|
|
||||||
def get_static_deployment(self) -> str:
|
def get_static_deployment(self) -> str:
|
||||||
documents = []
|
documents = []
|
||||||
for reconcile_key in self.reconcile_order:
|
for reconcile_key in self.reconcile_order:
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
"""authentik outpost signals"""
|
"""authentik outpost signals"""
|
||||||
from django.conf import settings
|
from django.core.cache import cache
|
||||||
from django.db.models import Model
|
from django.db.models import Model
|
||||||
from django.db.models.signals import post_save, pre_delete, pre_save
|
from django.db.models.signals import post_save, pre_delete, pre_save
|
||||||
from django.dispatch import receiver
|
from django.dispatch import receiver
|
||||||
|
@ -8,9 +8,12 @@ from structlog.stdlib import get_logger
|
||||||
from authentik.core.models import Provider
|
from authentik.core.models import Provider
|
||||||
from authentik.crypto.models import CertificateKeyPair
|
from authentik.crypto.models import CertificateKeyPair
|
||||||
from authentik.lib.utils.reflection import class_to_path
|
from authentik.lib.utils.reflection import class_to_path
|
||||||
from authentik.outposts.controllers.base import ControllerException
|
|
||||||
from authentik.outposts.models import Outpost, OutpostServiceConnection
|
from authentik.outposts.models import Outpost, OutpostServiceConnection
|
||||||
from authentik.outposts.tasks import outpost_controller_down, outpost_post_save
|
from authentik.outposts.tasks import (
|
||||||
|
CACHE_KEY_OUTPOST_DOWN,
|
||||||
|
outpost_controller,
|
||||||
|
outpost_post_save,
|
||||||
|
)
|
||||||
|
|
||||||
LOGGER = get_logger()
|
LOGGER = get_logger()
|
||||||
UPDATE_TRIGGERING_MODELS = (
|
UPDATE_TRIGGERING_MODELS = (
|
||||||
|
@ -39,7 +42,8 @@ def pre_save_outpost(sender, instance: Outpost, **_):
|
||||||
)
|
)
|
||||||
if bool(dirty):
|
if bool(dirty):
|
||||||
LOGGER.info("Outpost needs re-deployment due to changes", instance=instance)
|
LOGGER.info("Outpost needs re-deployment due to changes", instance=instance)
|
||||||
outpost_controller_down_wrapper(old_instance)
|
cache.set(CACHE_KEY_OUTPOST_DOWN % instance.pk.hex, old_instance)
|
||||||
|
outpost_controller.delay(instance.pk.hex, action="down", from_cache=True)
|
||||||
|
|
||||||
|
|
||||||
@receiver(post_save)
|
@receiver(post_save)
|
||||||
|
@ -63,23 +67,5 @@ def post_save_update(sender, instance: Model, **_):
|
||||||
def pre_delete_cleanup(sender, instance: Outpost, **_):
|
def pre_delete_cleanup(sender, instance: Outpost, **_):
|
||||||
"""Ensure that Outpost's user is deleted (which will delete the token through cascade)"""
|
"""Ensure that Outpost's user is deleted (which will delete the token through cascade)"""
|
||||||
instance.user.delete()
|
instance.user.delete()
|
||||||
outpost_controller_down_wrapper(instance)
|
cache.set(CACHE_KEY_OUTPOST_DOWN % instance.pk.hex, instance)
|
||||||
|
outpost_controller.delay(instance.pk.hex, action="down", from_cache=True)
|
||||||
|
|
||||||
def outpost_controller_down_wrapper(instance: Outpost):
|
|
||||||
"""To ensure that deployment is cleaned up *consistently* we call the controller, and wait
|
|
||||||
for it to finish. We don't want to call it in this thread, as we don't have the Outpost
|
|
||||||
Service connection here"""
|
|
||||||
try:
|
|
||||||
outpost_controller_down.delay(instance.pk.hex).get()
|
|
||||||
except RuntimeError: # pragma: no cover
|
|
||||||
# In e2e/integration tests, this might run inside a thread/process and
|
|
||||||
# trigger the celery `Never call result.get() within a task` detection
|
|
||||||
if settings.TEST:
|
|
||||||
pass
|
|
||||||
else:
|
|
||||||
raise
|
|
||||||
except ControllerException as exc:
|
|
||||||
LOGGER.warning(
|
|
||||||
"failed to cleanup outpost deployment", exc=exc, instance=instance
|
|
||||||
)
|
|
||||||
|
|
|
@ -36,6 +36,7 @@ from authentik.providers.proxy.controllers.kubernetes import ProxyKubernetesCont
|
||||||
from authentik.root.celery import CELERY_APP
|
from authentik.root.celery import CELERY_APP
|
||||||
|
|
||||||
LOGGER = get_logger()
|
LOGGER = get_logger()
|
||||||
|
CACHE_KEY_OUTPOST_DOWN = "outpost_teardown_%s"
|
||||||
|
|
||||||
|
|
||||||
def controller_for_outpost(outpost: Outpost) -> Optional[BaseController]:
|
def controller_for_outpost(outpost: Outpost) -> Optional[BaseController]:
|
||||||
|
@ -56,13 +57,6 @@ def controller_for_outpost(outpost: Outpost) -> Optional[BaseController]:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
@CELERY_APP.task()
|
|
||||||
def outpost_controller_all():
|
|
||||||
"""Launch Controller for all Outposts which support it"""
|
|
||||||
for outpost in Outpost.objects.exclude(service_connection=None):
|
|
||||||
outpost_controller.delay(outpost.pk.hex)
|
|
||||||
|
|
||||||
|
|
||||||
@CELERY_APP.task()
|
@CELERY_APP.task()
|
||||||
def outpost_service_connection_state(connection_pk: Any):
|
def outpost_service_connection_state(connection_pk: Any):
|
||||||
"""Update cached state of a service connection"""
|
"""Update cached state of a service connection"""
|
||||||
|
@ -89,17 +83,29 @@ def outpost_service_connection_monitor(self: MonitoredTask):
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@CELERY_APP.task()
|
||||||
|
def outpost_controller_all():
|
||||||
|
"""Launch Controller for all Outposts which support it"""
|
||||||
|
for outpost in Outpost.objects.exclude(service_connection=None):
|
||||||
|
outpost_controller.delay(outpost.pk.hex, "up", from_cache=False)
|
||||||
|
|
||||||
|
|
||||||
@CELERY_APP.task(bind=True, base=MonitoredTask)
|
@CELERY_APP.task(bind=True, base=MonitoredTask)
|
||||||
def outpost_controller(self: MonitoredTask, outpost_pk: str):
|
def outpost_controller(
|
||||||
"""Create/update/monitor the deployment of an Outpost"""
|
self: MonitoredTask, outpost_pk: str, action: str = "up", from_cache: bool = False
|
||||||
|
):
|
||||||
|
"""Create/update/monitor/delete the deployment of an Outpost"""
|
||||||
logs = []
|
logs = []
|
||||||
outpost: Outpost = Outpost.objects.get(pk=outpost_pk)
|
if from_cache:
|
||||||
|
outpost: Outpost = cache.get(CACHE_KEY_OUTPOST_DOWN % outpost_pk)
|
||||||
|
else:
|
||||||
|
outpost: Outpost = Outpost.objects.get(pk=outpost_pk)
|
||||||
self.set_uid(slugify(outpost.name))
|
self.set_uid(slugify(outpost.name))
|
||||||
try:
|
try:
|
||||||
controller = controller_for_outpost(outpost)
|
controller = controller_for_outpost(outpost)
|
||||||
if not controller:
|
if not controller:
|
||||||
return
|
return
|
||||||
logs = controller.up_with_logs()
|
logs = getattr(controller, f"{action}_with_logs")()
|
||||||
LOGGER.debug("---------------Outpost Controller logs starting----------------")
|
LOGGER.debug("---------------Outpost Controller logs starting----------------")
|
||||||
for log in logs:
|
for log in logs:
|
||||||
LOGGER.debug(log)
|
LOGGER.debug(log)
|
||||||
|
@ -110,16 +116,6 @@ def outpost_controller(self: MonitoredTask, outpost_pk: str):
|
||||||
self.set_status(TaskResult(TaskResultStatus.SUCCESSFUL, logs))
|
self.set_status(TaskResult(TaskResultStatus.SUCCESSFUL, logs))
|
||||||
|
|
||||||
|
|
||||||
@CELERY_APP.task()
|
|
||||||
def outpost_controller_down(outpost_pk: str):
|
|
||||||
"""Delete outpost objects before deleting the DB Object"""
|
|
||||||
outpost = Outpost.objects.get(pk=outpost_pk)
|
|
||||||
controller = controller_for_outpost(outpost)
|
|
||||||
if not controller:
|
|
||||||
return
|
|
||||||
controller.down()
|
|
||||||
|
|
||||||
|
|
||||||
@CELERY_APP.task(bind=True, base=MonitoredTask)
|
@CELERY_APP.task(bind=True, base=MonitoredTask)
|
||||||
def outpost_token_ensurer(self: MonitoredTask):
|
def outpost_token_ensurer(self: MonitoredTask):
|
||||||
"""Periodically ensure that all Outposts have valid Service Accounts
|
"""Periodically ensure that all Outposts have valid Service Accounts
|
||||||
|
|
|
@ -531,11 +531,6 @@ paths:
|
||||||
description: ''
|
description: ''
|
||||||
required: false
|
required: false
|
||||||
type: string
|
type: string
|
||||||
- name: ordering
|
|
||||||
in: query
|
|
||||||
description: Which field to use when ordering the results.
|
|
||||||
required: false
|
|
||||||
type: string
|
|
||||||
- name: search
|
- name: search
|
||||||
in: query
|
in: query
|
||||||
description: A search term.
|
description: A search term.
|
||||||
|
|
Reference in New Issue