292 lines
9.3 KiB
Python
292 lines
9.3 KiB
Python
import json
|
|
import base64
|
|
import logging
|
|
import requests
|
|
|
|
from django.template import loader
|
|
from django.core.mail import EmailMultiAlternatives
|
|
from django.conf import settings
|
|
from django.views.generic.edit import View, FormView
|
|
from django.http import HttpResponse, Http404, JsonResponse, QueryDict
|
|
from django.shortcuts import get_object_or_404, redirect
|
|
from django.views.decorators.csrf import csrf_exempt
|
|
from django.utils.decorators import method_decorator
|
|
from django.utils.translation import gettext_lazy as _
|
|
from django.urls import reverse_lazy
|
|
from django.contrib import messages
|
|
from django.contrib.auth import get_user_model
|
|
|
|
from oidc4vp.models import Authorization, Organization, OAuth2VPToken
|
|
from idhub.mixins import UserView
|
|
from idhub.models import Event
|
|
|
|
from oidc4vp.forms import AuthorizeForm
|
|
|
|
|
|
|
|
User = get_user_model()
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class AuthorizeView(UserView, FormView):
|
|
title = _("My wallet")
|
|
section = "MyWallet"
|
|
template_name = "credentials_presentation.html"
|
|
subtitle = _('Credential presentation')
|
|
icon = 'bi bi-patch-check-fill'
|
|
form_class = AuthorizeForm
|
|
success_url = reverse_lazy('idhub:user_demand_authorization')
|
|
|
|
def get(self, request, *args, **kwargs):
|
|
response = super().get(request, *args, **kwargs)
|
|
if self.request.session.get('next_url'):
|
|
return redirect(reverse_lazy('idhub:user_credentials_request'))
|
|
return response
|
|
|
|
def get_form_kwargs(self):
|
|
kwargs = super().get_form_kwargs()
|
|
kwargs['user'] = self.request.user
|
|
try:
|
|
vps = json.loads(self.request.GET.get('presentation_definition'))
|
|
except Exception:
|
|
vps = []
|
|
kwargs['presentation_definition'] = vps
|
|
kwargs["org"] = self.get_org()
|
|
kwargs["code"] = self.request.GET.get('code')
|
|
return kwargs
|
|
|
|
def get_form(self, form_class=None):
|
|
form = super().get_form(form_class=form_class)
|
|
if form.all_credentials.exists() and not form.credentials.exists():
|
|
self.request.session['next_url'] = self.request.get_full_path()
|
|
return form
|
|
|
|
def form_valid(self, form):
|
|
authorization = form.save()
|
|
if not authorization or authorization.status_code != 200:
|
|
messages.error(self.request, _("Error sending credential!"))
|
|
return redirect(self.success_url)
|
|
try:
|
|
authorization = authorization.json()
|
|
except Exception:
|
|
messages.error(self.request, _("Error sending credential!"))
|
|
return redirect(self.success_url)
|
|
|
|
verify = authorization.get('verify')
|
|
result, msg = verify.split(",")
|
|
if 'error' in result.lower():
|
|
messages.error(self.request, msg)
|
|
if 'ok' in result.lower():
|
|
messages.success(self.request, msg)
|
|
|
|
cred = form.credentials.first()
|
|
verifier = form.org.name
|
|
if cred and verifier:
|
|
Event.set_EV_CREDENTIAL_PRESENTED(cred, verifier)
|
|
|
|
if authorization.get('redirect_uri'):
|
|
return redirect(authorization.get('redirect_uri'))
|
|
elif authorization.get('response'):
|
|
txt = authorization.get('response')
|
|
messages.success(self.request, txt)
|
|
Event.set_EV_USR_SEND_CREDENTIAL(txt)
|
|
txt2 = f"Verifier {verifier} send: " + txt
|
|
Event.set_EV_USR_SEND_VP(txt2, self.request.user)
|
|
url = reverse_lazy('idhub:user_dashboard')
|
|
return redirect(url)
|
|
|
|
return redirect(self.success_url)
|
|
|
|
def get_org(self):
|
|
client_id = self.request.GET.get("client_id")
|
|
if not client_id:
|
|
raise Http404("Organization not found!")
|
|
|
|
org = get_object_or_404(
|
|
Organization,
|
|
client_id=client_id,
|
|
)
|
|
return org
|
|
|
|
|
|
@method_decorator(csrf_exempt, name='dispatch')
|
|
class VerifyView(View):
|
|
subject_template_name = 'email/verify_subject.txt'
|
|
email_template_name = 'email/verify_email.txt'
|
|
html_email_template_name = 'email/verify_email.html'
|
|
|
|
def get(self, request, *args, **kwargs):
|
|
org = self.validate(request)
|
|
presentation_definition = json.dumps(settings.SUPPORTED_CREDENTIALS)
|
|
authorization = Authorization(
|
|
organization=org,
|
|
presentation_definition=presentation_definition
|
|
)
|
|
authorization.save()
|
|
res = json.dumps({"redirect_uri": authorization.authorize()})
|
|
return HttpResponse(res)
|
|
|
|
def post(self, request, *args, **kwargs):
|
|
code = self.request.POST.get("code")
|
|
vp_tk = self.request.POST.get("vp_token")
|
|
|
|
if not vp_tk or not code:
|
|
raise Http404("Page not Found!")
|
|
|
|
org = self.validate(request)
|
|
|
|
self.vp_token = OAuth2VPToken(
|
|
vp_token = vp_tk,
|
|
organization=org,
|
|
code=code
|
|
)
|
|
|
|
if not self.vp_token.authorization:
|
|
raise Http404("Page not Found!")
|
|
|
|
self.vp_token.verifing()
|
|
response = self.vp_token.get_response_verify()
|
|
self.vp_token.save()
|
|
|
|
for user in User.objects.filter(is_admin=True):
|
|
self.send_email(user)
|
|
|
|
response["response"] = "Validation Code {}".format(code)
|
|
return JsonResponse(response)
|
|
|
|
def validate(self, request):
|
|
auth_header = request.headers.get('Authorization', b'')
|
|
auth_data = auth_header.split()
|
|
|
|
if len(auth_data) == 2 and auth_data[0].lower() == 'basic':
|
|
decoded_auth = base64.b64decode(auth_data[1]).decode('utf-8')
|
|
client_id, client_secret = decoded_auth.split(':', 1)
|
|
org = get_object_or_404(
|
|
Organization,
|
|
client_id=client_id,
|
|
client_secret=client_secret
|
|
)
|
|
return org
|
|
|
|
raise Http404("Page not Found!")
|
|
|
|
def send_email(self, user):
|
|
"""
|
|
Send a email when a user is activated.
|
|
"""
|
|
verification = self.vp_token.get_result_verify()
|
|
if not verification:
|
|
return
|
|
|
|
if verification.get('errors') or verification.get('warnings'):
|
|
return
|
|
|
|
email = self.get_email(user)
|
|
try:
|
|
if settings.ENABLE_EMAIL:
|
|
email.send()
|
|
return
|
|
|
|
logger.warning(user.email)
|
|
logger.warning(email.body)
|
|
|
|
except Exception as err:
|
|
logger.error(err)
|
|
return
|
|
|
|
def get_context(self):
|
|
url_domain = "https://{}/".format(settings.DOMAIN)
|
|
context = {
|
|
"domain": settings.DOMAIN,
|
|
"url_domain": url_domain,
|
|
"verification": self.get_verification(),
|
|
"code": self.vp_token.code,
|
|
}
|
|
return context
|
|
|
|
def get_email(self, user):
|
|
context = self.get_context()
|
|
subject = loader.render_to_string(self.subject_template_name, context)
|
|
# Email subject *must not* contain newlines
|
|
subject = ''.join(subject.splitlines())
|
|
body = loader.render_to_string(self.email_template_name, context)
|
|
from_email = settings.DEFAULT_FROM_EMAIL
|
|
to_email = user.email
|
|
|
|
email_message = EmailMultiAlternatives(
|
|
subject, body, from_email, [to_email])
|
|
html_email = loader.render_to_string(self.html_email_template_name, context)
|
|
email_message.attach_alternative(html_email, 'text/html')
|
|
return email_message
|
|
|
|
def get_verification(self):
|
|
return self.vp_token.get_user_info_all()
|
|
|
|
|
|
class AllowCodeView(View):
|
|
def get(self, request, *args, **kwargs):
|
|
code = self.request.GET.get("code")
|
|
|
|
if not code:
|
|
raise Http404("Page not Found!")
|
|
self.authorization = get_object_or_404(
|
|
Authorization,
|
|
code=code,
|
|
code_used=False
|
|
)
|
|
if self.request.session.get("response_uri"):
|
|
url = self.send_api()
|
|
if url:
|
|
return redirect(url)
|
|
|
|
promotion = self.authorization.promotions.first()
|
|
if not promotion:
|
|
return redirect(reverse_lazy('oidc4vp:received_code'))
|
|
|
|
return redirect(promotion.get_url(code))
|
|
|
|
def send_api(self):
|
|
vp = self.get_vp_token()
|
|
if not vp:
|
|
return
|
|
|
|
data = {
|
|
"vp_token": vp,
|
|
"code": self.authorization.code
|
|
}
|
|
url = self.request.session.get("response_uri")
|
|
result = requests.post(url, data=data)
|
|
return result.json().get('redirect_uri')
|
|
|
|
def get_vp_token(self):
|
|
vp = self.authorization.vp_tokens.first()
|
|
if not vp:
|
|
return
|
|
return base64.b64encode(vp.vp_token.encode()).decode()
|
|
|
|
def get_response_uri(self):
|
|
data = {
|
|
"code": self.authorization.code,
|
|
}
|
|
query_dict = QueryDict('', mutable=True)
|
|
query_dict.update(data)
|
|
|
|
response_uri = self.request.session.get("response_uri")
|
|
|
|
url = '{response_uri}?{params}'.format(
|
|
response_uri=response_uri,
|
|
params=query_dict.urlencode()
|
|
)
|
|
return url
|
|
|
|
|
|
class ReceivedCodeView(View):
|
|
template_name = "received_code.html"
|
|
|
|
def get(self, request, *args, **kwargs):
|
|
self.context = {}
|
|
template = loader.get_template(
|
|
self.template_name,
|
|
).render()
|
|
return HttpResponse(template)
|