This repository has been archived on 2024-05-31. You can view files and clone it, but cannot push or open issues or pull requests.
authentik/tests/integration/test_proxy_kubernetes.py
Jens Langhammer b8a566f4a0 outposts: move local connection check to task, run every 60 minutes
Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org>
2021-04-21 11:34:48 +02:00

60 lines
2.2 KiB
Python

"""Test Controllers"""
import yaml
from django.test import TestCase
from authentik.flows.models import Flow
from authentik.outposts.models import KubernetesServiceConnection, Outpost, OutpostType
from authentik.outposts.tasks import outpost_local_connection
from authentik.providers.proxy.controllers.kubernetes import ProxyKubernetesController
from authentik.providers.proxy.models import ProxyProvider
class TestProxyKubernetes(TestCase):
"""Test Controllers"""
def setUp(self):
# Ensure that local connection have been created
outpost_local_connection()
def test_kubernetes_controller_static(self):
"""Test Kubernetes Controller"""
provider: ProxyProvider = ProxyProvider.objects.create(
name="test",
internal_host="http://localhost",
external_host="http://localhost",
authorization_flow=Flow.objects.first(),
)
service_connection = KubernetesServiceConnection.objects.first()
outpost: Outpost = Outpost.objects.create(
name="test",
type=OutpostType.PROXY,
service_connection=service_connection,
)
outpost.providers.add(provider)
outpost.save()
controller = ProxyKubernetesController(outpost, service_connection)
manifest = controller.get_static_deployment()
self.assertEqual(len(list(yaml.load_all(manifest, Loader=yaml.SafeLoader))), 4)
def test_kubernetes_controller_deploy(self):
"""Test Kubernetes Controller"""
provider: ProxyProvider = ProxyProvider.objects.create(
name="test",
internal_host="http://localhost",
external_host="http://localhost",
authorization_flow=Flow.objects.first(),
)
service_connection = KubernetesServiceConnection.objects.first()
outpost: Outpost = Outpost.objects.create(
name="test",
type=OutpostType.PROXY,
service_connection=service_connection,
)
outpost.providers.add(provider)
outpost.save()
controller = ProxyKubernetesController(outpost, service_connection)
controller.up()
controller.down()