IdHub/oidc4vp/models.py

326 lines
9.9 KiB
Python

import json
import requests
import secrets
import nacl
import base64
from nacl import pwhash, secret
from django.core.cache import cache
from django.conf import settings
from django.http import QueryDict
from django.utils.translation import gettext_lazy as _
from idhub_auth.models import User
from django.db import models
from utils.idhub_ssikit import verify_presentation
SALT_CHARS = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
def gen_salt(length: int) -> str:
"""Generate a random string of SALT_CHARS with specified ``length``."""
if length <= 0:
raise ValueError("Salt length must be positive")
return "".join(secrets.choice(SALT_CHARS) for _ in range(length))
def set_client_id():
return gen_salt(24)
def set_client_secret():
return gen_salt(48)
def set_code():
return gen_salt(24)
class Organization(models.Model):
"""
This class represent a member of one net trust or federated host.
Client_id and client_secret are the credentials of this organization
get a connection to my. (receive a request)
My_client_id and my_client_secret are my credentials than to use if I
want to connect to this organization. (send a request)
For use the packages requests we need use my_client_id
For use in the get or post method of a View, then we need use client_id
and secret_id.
main is a field which indicates the organization of this idhub
"""
name = models.CharField(max_length=250)
domain = models.CharField(max_length=250, null=True, default=None)
main = models.BooleanField(default=False)
client_id = models.CharField(
max_length=24,
default=set_client_id,
unique=True
)
client_secret = models.CharField(
max_length=48,
default=set_client_secret
)
my_client_id = models.CharField(
max_length=24,
)
my_client_secret = models.CharField(
max_length=48,
)
response_uri = models.URLField(
help_text=_("Url where to send the verificable presentation"),
max_length=250
)
encrypted_sensitive_data = models.CharField(max_length=255, default=None, null=True)
salt = models.CharField(max_length=255, default=None, null=True)
def send(self, vp, code):
"""
Send the verificable presentation to Verifier
"""
url = "{url}/verify".format(
url=self.response_uri.strip("/"),
)
auth = (self.my_client_id, self.my_client_secret)
data = {"vp_token": vp}
if code:
data["code"] = code
return requests.post(url, data=data, auth=auth)
def demand_authorization(self):
"""
Send the a request for start a process of Verifier
"""
url = "{url}/verify?demand_uri={redirect_uri}".format(
url=self.response_uri.strip("/"),
redirect_uri=self.response_uri
)
auth = (self.my_client_id, self.my_client_secret)
return requests.get(url, auth=auth)
def derive_key_from_password(self, password=None):
if not password:
password = cache.get("KEY_DIDS").encode('utf-8')
kdf = pwhash.argon2i.kdf
ops = pwhash.argon2i.OPSLIMIT_INTERACTIVE
mem = pwhash.argon2i.MEMLIMIT_INTERACTIVE
return kdf(
secret.SecretBox.KEY_SIZE,
password,
self.get_salt(),
opslimit=ops,
memlimit=mem
)
def decrypt_sensitive_data(self, data=None):
sb_key = self.derive_key_from_password()
sb = secret.SecretBox(sb_key)
if not data:
data = self.get_encrypted_sensitive_data()
if not isinstance(data, bytes):
data = data.encode('utf-8')
return sb.decrypt(data).decode('utf-8')
def encrypt_sensitive_data(self, data):
sb_key = self.derive_key_from_password()
sb = secret.SecretBox(sb_key)
if not isinstance(data, bytes):
data = data.encode('utf-8')
return base64.b64encode(sb.encrypt(data)).decode('utf-8')
def get_salt(self):
if not self.salt:
return ''
return base64.b64decode(self.salt.encode('utf-8'))
def set_salt(self):
self.salt = base64.b64encode(nacl.utils.random(16)).decode('utf-8')
def get_encrypted_sensitive_data(self):
return base64.b64decode(self.encrypted_sensitive_data.encode('utf-8'))
def set_encrypted_sensitive_data(self):
key = base64.b64encode(nacl.utils.random(64))
self.set_salt()
key_crypted = self.encrypt_sensitive_data(key)
self.encrypted_sensitive_data = key_crypted
def encrypt_data(self, data):
pw = self.decrypt_sensitive_data().encode('utf-8')
sb = self.get_secret_box(pw)
value_enc = sb.encrypt(data.encode('utf-8'))
return base64.b64encode(value_enc).decode('utf-8')
def decrypt_data(self, data):
pw = self.decrypt_sensitive_data().encode('utf-8')
sb = self.get_secret_box(pw)
value = base64.b64decode(data.encode('utf-8'))
return sb.decrypt(value).decode('utf-8')
def get_secret_box(self, password):
sb_key = self.derive_key_from_password(password=password)
return secret.SecretBox(sb_key)
def change_password_key(self, new_password):
data = self.decrypt_sensitive_data()
sb_key = self.derive_key_from_password(password=new_password)
sb = secret.SecretBox(sb_key)
if not isinstance(data, bytes):
data = data.encode('utf-8')
encrypted_data = base64.b64encode(sb.encrypt(data)).decode('utf-8')
self.encrypted_sensitive_data = encrypted_data
def __str__(self):
return self.name
###################
# Verifier clases #
###################
class Authorization(models.Model):
"""
This class represent a query through browser the client to the wallet.
The Verifier need to do a redirection to the user to Wallet.
The code we use as a soft foreing key between Authorization and OAuth2VPToken.
"""
code = models.CharField(max_length=24, default=set_code)
code_used = models.BooleanField(default=False)
created = models.DateTimeField(auto_now=True)
presentation_definition = models.CharField(max_length=250)
organization = models.ForeignKey(
Organization,
on_delete=models.CASCADE,
related_name='authorizations',
null=True,
)
user = models.ForeignKey(
User,
on_delete=models.CASCADE,
null=True,
)
def authorize(self, path=None):
data = {
"response_type": "vp_token",
"response_mode": "direct_post",
"client_id": self.organization.my_client_id,
"presentation_definition": self.presentation_definition,
"code": self.code,
"nonce": gen_salt(5),
}
query_dict = QueryDict('', mutable=True)
query_dict.update(data)
response_uri = self.organization.response_uri.strip("/")
if path:
response_uri = "{}/{}".format(response_uri, path.strip("/"))
url = '{response_uri}/authorize?{params}'.format(
response_uri=response_uri,
params=query_dict.urlencode()
)
return url
class OAuth2VPToken(models.Model):
"""
This class represent the response of Wallet to Verifier
and the result of verify.
"""
created = models.DateTimeField(auto_now=True)
result_verify = models.CharField(max_length=255)
vp_token = models.TextField()
organization = models.ForeignKey(
Organization,
on_delete=models.CASCADE,
related_name='vp_tokens',
null=True,
)
user = models.ForeignKey(
User,
on_delete=models.CASCADE,
related_name='vp_tokens',
null=True,
)
authorization = models.ForeignKey(
Authorization,
on_delete=models.SET_NULL,
related_name='vp_tokens',
null=True,
)
def __init__(self, *args, **kwargs):
code = kwargs.pop("code", None)
super().__init__(*args, **kwargs)
self.authorization = Authorization.objects.filter(code=code).first()
@property
def code(self):
if not self.authorization:
return ''
return self.authorization.code
def verifing(self):
self.result_verify = verify_presentation(self.vp_token)
def get_result_verify(self):
if not self.result_verify:
return {}
return json.loads(self.result_verify)
def get_response_verify(self):
response = {
"verify": ',',
"redirect_uri": "",
"response": "",
}
verification = json.loads(self.result_verify)
if verification.get('errors') or verification.get('warnings'):
response["verify"] = "Error, {}".format(_("Failed verification"))
return response
response["verify"] = "Ok, {}".format(_("Correct verification"))
url = self.get_redirect_url()
if url:
response["redirect_uri"] = url
return response
def get_redirect_url(self):
if not settings.OIDC_REDIRECT:
return
data = {
"code": self.authorization.code,
}
query_dict = QueryDict('', mutable=True)
query_dict.update(data)
response_uri = settings.ALLOW_CODE_URI
url = '{response_uri}?{params}'.format(
response_uri=response_uri,
params=query_dict.urlencode()
)
return url
def get_user_info(self):
tk = json.loads(self.vp_token)
self.user_info = tk.get(
"verifiableCredential", [{}]
)[-1].get("credentialSubject")
return json.dumps(self.user_info, indent=2)
def get_user_info_all(self):
tk = json.loads(self.vp_token)
return json.dumps(tk, indent=2)