bc9e7e8b93
* build(deps): bump structlog from 20.1.0 to 20.2.0 Bumps [structlog](https://github.com/hynek/structlog) from 20.1.0 to 20.2.0. - [Release notes](https://github.com/hynek/structlog/releases) - [Changelog](https://github.com/hynek/structlog/blob/master/CHANGELOG.rst) - [Commits](https://github.com/hynek/structlog/compare/20.1.0...20.2.0) Signed-off-by: dependabot[bot] <support@github.com> * *: use structlog.stdlib instead of structlog for type-hints Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: Jens Langhammer <jens.langhammer@beryju.org>
31 lines
851 B
Python
31 lines
851 B
Python
"""Channels base classes"""
|
|
from channels.exceptions import DenyConnection
|
|
from channels.generic.websocket import JsonWebsocketConsumer
|
|
from structlog.stdlib import get_logger
|
|
|
|
from authentik.api.auth import token_from_header
|
|
from authentik.core.models import User
|
|
|
|
LOGGER = get_logger()
|
|
|
|
|
|
class AuthJsonConsumer(JsonWebsocketConsumer):
|
|
"""Authorize a client with a token"""
|
|
|
|
user: User
|
|
|
|
def connect(self):
|
|
headers = dict(self.scope["headers"])
|
|
if b"authorization" not in headers:
|
|
LOGGER.warning("WS Request without authorization header")
|
|
raise DenyConnection()
|
|
|
|
raw_header = headers[b"authorization"]
|
|
|
|
token = token_from_header(raw_header)
|
|
if not token:
|
|
LOGGER.warning("Failed to authenticate")
|
|
raise DenyConnection()
|
|
|
|
self.user = token.user
|