This repository has been archived on 2024-05-31. You can view files and clone it, but cannot push or open issues or pull requests.
authentik/passbook/policies/expression/evaluator.py
2020-06-02 15:20:02 +02:00

137 lines
5.3 KiB
Python

"""passbook expression policy evaluator"""
import re
from typing import Any, Dict, List, Optional
from django.core.exceptions import ValidationError
from django.http import HttpRequest
from jinja2 import Undefined
from jinja2.exceptions import TemplateSyntaxError
from jinja2.nativetypes import NativeEnvironment
from requests import Session
from structlog import get_logger
from passbook.core.models import User
from passbook.flows.planner import PLAN_CONTEXT_SSO
from passbook.flows.views import SESSION_KEY_PLAN
from passbook.lib.utils.http import get_client_ip
from passbook.policies.types import PolicyRequest, PolicyResult
LOGGER = get_logger()
class Evaluator:
"""Validate and evaluate jinja2-based expressions"""
_env: NativeEnvironment
_context: Dict[str, Any]
_messages: List[str]
def __init__(self):
self._env = NativeEnvironment(
extensions=["jinja2.ext.do"],
trim_blocks=True,
lstrip_blocks=True,
line_statement_prefix=">",
)
# update passbook/policies/expression/templates/policy/expression/form.html
# update docs/policies/expression/index.md
self._env.filters["regex_match"] = Evaluator.jinja2_filter_regex_match
self._env.filters["regex_replace"] = Evaluator.jinja2_filter_regex_replace
self._env.globals["pb_message"] = self.jinja2_func_message
self._context = {
"pb_is_group_member": Evaluator.jinja2_func_is_group_member,
"pb_user_by": Evaluator.jinja2_func_user_by,
"pb_logger": get_logger(),
"requests": Session(),
}
self._messages = []
@property
def env(self) -> NativeEnvironment:
"""Access to our custom NativeEnvironment"""
return self._env
@staticmethod
def jinja2_filter_regex_match(value: Any, regex: str) -> bool:
"""Jinja2 Filter to run re.search"""
return re.search(regex, value) is None
@staticmethod
def jinja2_filter_regex_replace(value: Any, regex: str, repl: str) -> str:
"""Jinja2 Filter to run re.sub"""
return re.sub(regex, repl, value)
@staticmethod
def jinja2_func_user_by(**filters) -> Optional[User]:
"""Get user by filters"""
users = User.objects.filter(**filters)
if users:
return users.first()
return None
@staticmethod
def jinja2_func_is_group_member(user: User, group_name: str) -> bool:
"""Check if `user` is member of group with name `group_name`"""
return user.groups.filter(name=group_name).exists()
def jinja2_func_message(self, message: str):
"""Wrapper to append to messages list, which is returned with PolicyResult"""
self._messages.append(message)
def set_policy_request(self, request: PolicyRequest):
"""Update context based on policy request (if http request is given, update that too)"""
# update passbook/policies/expression/templates/policy/expression/form.html
# update docs/policies/expression/index.md
self._context["pb_is_sso_flow"] = request.context.get(PLAN_CONTEXT_SSO, False)
self._context["request"] = request
if request.http_request:
self.set_http_request(request.http_request)
def set_http_request(self, request: HttpRequest):
"""Update context based on http request"""
# update passbook/policies/expression/templates/policy/expression/form.html
# update docs/policies/expression/index.md
self._context["pb_client_ip"] = (
get_client_ip(request.http_request) or "255.255.255.255"
)
self._context["request"] = request
if SESSION_KEY_PLAN in request.http_request.session:
self._context["pb_flow_plan"] = request.http_request.session[
SESSION_KEY_PLAN
]
def evaluate(self, expression_source: str) -> PolicyResult:
"""Parse and evaluate expression. Policy is expected to return a truthy object.
Messages can be added using 'do pb_message()'."""
try:
expression = self._env.from_string(expression_source.lstrip().rstrip())
except TemplateSyntaxError as exc:
return PolicyResult(False, str(exc))
try:
result: Optional[Any] = expression.render(self._context)
except Exception as exc: # pylint: disable=broad-except
LOGGER.warning("Expression error", exc=exc)
return PolicyResult(False, str(exc))
else:
policy_result = PolicyResult(False)
policy_result.messages = tuple(self._messages)
if isinstance(result, Undefined):
LOGGER.warning(
"Expression policy returned undefined",
src=expression_source,
req=self._context,
)
policy_result.passing = False
if result:
policy_result.passing = bool(result)
return policy_result
def validate(self, expression: str):
"""Validate expression's syntax, raise ValidationError if Syntax is invalid"""
try:
self._env.from_string(expression)
return True
except TemplateSyntaxError as exc:
raise ValidationError(f"Expression Syntax Error: {str(exc)}") from exc