This repository has been archived on 2024-05-31. You can view files and clone it, but cannot push or open issues or pull requests.
authentik/passbook/audit/middleware.py

88 lines
3 KiB
Python
Raw Normal View History

"""Audit middleware"""
from functools import partial
from typing import Callable
from django.contrib.auth.models import User
from django.db.models import Model
from django.db.models.signals import post_save, pre_delete
from django.http import HttpRequest, HttpResponse
from passbook.audit.models import Event, EventAction, model_to_dict
from passbook.audit.signals import EventNewThread
from passbook.core.middleware import LOCAL
class AuditMiddleware:
"""Register handlers for duration of request-response that log creation/update/deletion
of models"""
get_response: Callable[[HttpRequest], HttpResponse]
def __init__(self, get_response: Callable[[HttpRequest], HttpResponse]):
self.get_response = get_response
def __call__(self, request: HttpRequest) -> HttpResponse:
# Connect signal for automatic logging
if hasattr(request, "user") and getattr(
request.user, "is_authenticated", False
):
post_save_handler = partial(
self.post_save_handler, user=request.user, request=request
)
pre_delete_handler = partial(
self.pre_delete_handler, user=request.user, request=request
)
post_save.connect(
post_save_handler,
dispatch_uid=LOCAL.passbook["request_id"],
weak=False,
)
pre_delete.connect(
pre_delete_handler,
dispatch_uid=LOCAL.passbook["request_id"],
weak=False,
)
response = self.get_response(request)
post_save.disconnect(dispatch_uid=LOCAL.passbook["request_id"])
pre_delete.disconnect(dispatch_uid=LOCAL.passbook["request_id"])
return response
# pylint: disable=unused-argument
def process_exception(self, request: HttpRequest, exception: Exception):
"""Unregister handlers in case of exception"""
post_save.disconnect(dispatch_uid=LOCAL.passbook["request_id"])
pre_delete.disconnect(dispatch_uid=LOCAL.passbook["request_id"])
@staticmethod
# pylint: disable=unused-argument
def post_save_handler(
user: User, request: HttpRequest, sender, instance: Model, created: bool, **_
):
"""Signal handler for all object's post_save"""
if isinstance(instance, Event):
return
action = EventAction.MODEL_CREATED if created else EventAction.MODEL_UPDATED
EventNewThread(
action, request, user=user, kwargs={"model": model_to_dict(instance)}
).run()
@staticmethod
# pylint: disable=unused-argument
def pre_delete_handler(
user: User, request: HttpRequest, sender, instance: Model, **_
):
"""Signal handler for all object's pre_delete"""
if isinstance(instance, Event):
return
EventNewThread(
EventAction.MODEL_DELETED,
request,
user=user,
kwargs={"model": model_to_dict(instance)},
).run()