import json import base64 import jsonschema import pandas as pd from nacl.exceptions import CryptoError from openpyxl import load_workbook from django import forms from django.core.cache import cache from django.utils.translation import gettext_lazy as _ from django.core.exceptions import ValidationError from utils import certs from idhub.models import ( DID, File_datas, Membership, Schemas, UserRol, VerificableCredential, ) from idhub_auth.models import User class TermsConditionsForm2(forms.Form): accept = forms.BooleanField( label=_("Accept terms and conditions of the service"), required=False ) def __init__(self, *args, **kwargs): self.user = kwargs.pop('user', None) super().__init__(*args, **kwargs) def clean(self): data = self.cleaned_data if data.get("accept"): self.user.accept_gdpr = True else: self.user.accept_gdpr = False return data def save(self, commit=True): if commit: self.user.save() return self.user return class EncryptionKeyForm(forms.Form): key = forms.CharField( label=_("Key for encrypt the secrets of all system"), required=True ) def clean(self): data = self.cleaned_data self._key = data["key"] if not DID.objects.exists(): return data did = DID.objects.first() cache.set("KEY_DIDS", self._key, None) try: did.get_key_material() except CryptoError: cache.set("KEY_DIDS", None) txt = _("Key no valid!") raise ValidationError(txt) cache.set("KEY_DIDS", None) return data def save(self, commit=True): if commit: cache.set("KEY_DIDS", self._key, None) if not DID.objects.exists(): did = DID.objects.create(label='Default', type=DID.Types.WEB) did.set_did() did.save() return class TermsConditionsForm(forms.Form): accept_privacy = forms.BooleanField( widget=forms.CheckboxInput(attrs={'class': 'form-check-input'}), required=False ) accept_legal = forms.BooleanField( widget=forms.CheckboxInput(attrs={'class': 'form-check-input'}), required=False ) accept_cookies = forms.BooleanField( widget=forms.CheckboxInput(attrs={'class': 'form-check-input'}), required=False ) def __init__(self, *args, **kwargs): self.user = kwargs.pop('user', None) super().__init__(*args, **kwargs) def get_label(self, url, read): label = _('I read and accepted the') label += f' {read}' return label def privacy_label(self): url = "https://laweb.pangea.org/politica-de-privacitat/" read = _("Privacy policy") return self.get_label(url, read) def legal_label(self): url = "https://laweb.pangea.org/avis-legal/" read = _("Legal policy") return self.get_label(url, read) def cookies_label(self): url = "https://laweb.pangea.org/politica-de-cookies-2/" read = _("Cookies policy") return self.get_label(url, read) def clean(self): data = self.cleaned_data privacy = data.get("accept_privacy") legal = data.get("accept_legal") cookies = data.get("accept_cookies") if privacy and legal and cookies: self.user.accept_gdpr = True else: self.user.accept_gdpr = False return data def save(self, commit=True): if commit: self.user.save() return self.user return class ImportForm(forms.Form): did = forms.ChoiceField(label=_("Did"), choices=[]) eidas1 = forms.ChoiceField( label=_("Signature with Eidas1"), choices=[], required=False ) schema = forms.ChoiceField(label=_("Schema"), choices=[]) file_import = forms.FileField(label=_("File to import")) def __init__(self, *args, **kwargs): self._schema = None self._did = None self._eidas1 = None self.rows = {} self.properties = {} self.users = [] super().__init__(*args, **kwargs) dids = DID.objects.filter(user__isnull=True) self.fields['did'].choices = [ (x.did, x.label) for x in dids.filter(eidas1=False) ] txt_select_one = _("Please choose a data schema ...") self.fields['schema'].choices = [(0,txt_select_one)] + [ (x.id, x.name) for x in Schemas.objects.filter() ] if dids.filter(eidas1=True).exists(): choices = [("", "")] choices.extend([ (x.did, x.label) for x in dids.filter(eidas1=True) ]) self.fields['eidas1'].choices = choices else: self.fields.pop('eidas1') def clean(self): data = self.cleaned_data["did"] did = DID.objects.filter( user__isnull=True, did=data ) if not did.exists(): raise ValidationError(_("Did not valid!")) self._did = did.first() eidas1 = self.cleaned_data.get('eidas1') if eidas1: self._eidas1 = DID.objects.filter( user__isnull=True, eidas1=True, did=eidas1 ).first() return data def clean_schema(self): data = self.cleaned_data["schema"] schema = Schemas.objects.filter( id=data ) if not schema.exists(): raise ValidationError(_("Schema is not valid!")) self._schema = schema.first() try: self.json_schema = self._schema.get_credential_subject_schema() except Exception: raise ValidationError(_("Schema not valid!")) return data def clean_file_import(self): data = self.cleaned_data["file_import"] if not self._schema: return data self.file_name = data.name props = self.json_schema.get("properties", {}) # Forze than pandas read one column as string dtype_dict = { "phoneNumber": str, "phone": str, 'postCode': str } df = pd.read_excel(data, dtype=dtype_dict) df.fillna('', inplace=True) try: workbook = load_workbook(data) # if no there are schema meen than is a excel costum and you # don't have control abour that if 'Schema' in workbook.custom_doc_props.names: excel_schema = workbook.custom_doc_props['Schema'].value file_schema = self._schema.file_schema.split('.json')[0] assert file_schema in excel_schema except Exception: txt = _("This File does not correspond to this scheme!") raise ValidationError(txt) # convert dates to iso 8601 for col in df.select_dtypes(include='datetime').columns: df[col] = df[col].dt.strftime("%Y-%m-%d") df.fillna('', inplace=True) # convert numbers to strings if this is indicate in schema for col in props.keys(): if col not in df.columns: continue if "string" in props[col]["type"]: df[col] = df[col].astype(str) # TODO @cayop if there are a cel with nan then now is '' # for this raison crash with df[col].astype(int) # elif "integer" in props[col]["type"]: # df[col] = df[col].astype(int) # elif "number" in props[col]["type"]: # df[col] = df[col].astype(float) data_pd = df.to_dict(orient='index') if not data_pd or df.last_valid_index() is None: self.exception(_("The file you try to import is empty!")) for n in data_pd.keys(): row = {} d = data_pd[n] for k, v in d.items(): if d[k] or d[k] == 0: row[k] = d[k] if row: user = self.validate_jsonld(n+2, row) if user: self.rows[user] = row return data def save(self, commit=True): table = [] for k, v in self.rows.items(): table.append(self.create_credential(k, v)) if commit: for cred in table: cred.save() File_datas.objects.create(file_name=self.file_name) return table return def validate_jsonld(self, line, row): try: jsonschema.validate( instance=row, schema=self.json_schema, format_checker=jsonschema.Draft202012Validator.FORMAT_CHECKER ) except jsonschema.exceptions.ValidationError as err: msg = "line {}: {}".format(line, err.message) return self.exception(msg) user, new = User.objects.get_or_create(email=row.get('email')) if new: self.users.append(user) user.set_encrypted_sensitive_data() user.save() self.create_defaults_dids(user) return user def create_defaults_dids(self, user): did = DID(label="Default", user=user, type=DID.Types.WEB) did.set_did() did.save() def create_credential(self, user, row): bcred = VerificableCredential.objects.filter( user=user, schema=self._schema, issuer_did=self._did, status=VerificableCredential.Status.ENABLED ) if bcred.exists(): cred = bcred.first() cred.csv_data = json.dumps(row, default=str) cred.eidas1_did = self._eidas1 return cred cred = VerificableCredential( verified=False, user=user, csv_data=json.dumps(row, default=str), issuer_did=self._did, schema=self._schema, eidas1_did=self._eidas1 ) cred.set_type() return cred def exception(self, msg): File_datas.objects.create(file_name=self.file_name, success=False) raise ValidationError(msg) class SchemaForm(forms.Form): file_template = forms.FileField(label=_("File template")) class MembershipForm(forms.ModelForm): class Meta: model = Membership fields = ['type', 'start_date', 'end_date'] def clean_end_date(self): data = super().clean() start_date = data['start_date'] end_date = data.get('end_date') members = Membership.objects.filter( type=data['type'], user=self.instance.user ) if self.instance.id: members = members.exclude(id=self.instance.id) if members.filter(start_date__lte=start_date, end_date=None).exists(): msg = _("This membership already exists!") raise forms.ValidationError(msg) if (start_date and end_date): if start_date > end_date: msg = _("The end date is less than the start date") raise forms.ValidationError(msg) members = members.filter( start_date__lte=end_date, end_date__gte=start_date, ) if members.exists(): msg = _("This membership already exists!") raise forms.ValidationError(msg) if not end_date: members = members.filter( start_date__gte=start_date, ) if members.exists(): msg = _("This membership already exists!") raise forms.ValidationError(msg) return end_date class UserRolForm(forms.ModelForm): class Meta: model = UserRol fields = ['service'] def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) if not self.instance.id: user = self.instance.user choices = self.fields['service'].choices choices.queryset = choices.queryset.exclude(users__user=user) self.fields['service'].choices = choices def clean_service(self): data = super().clean() service = UserRol.objects.filter( service=data['service'], user=self.instance.user ) if service.exists(): msg = _("Is not possible to have a duplicate role") raise forms.ValidationError(msg) return data['service'] class ImportCertificateForm(forms.Form): label = forms.CharField(label=_("Label")) password = forms.CharField( label=_("Password of certificate"), widget=forms.PasswordInput ) file_import = forms.FileField(label=_("File import")) def __init__(self, *args, **kwargs): self._did = None self._s = None self._label = None super().__init__(*args, **kwargs) def clean(self): data = super().clean() file_import = data.get('file_import') self.pfx_file = file_import.read() self.file_name = file_import.name self._pss = data.get('password') self._label = data.get('label') if not self.pfx_file or not self._pss: msg = _("Is not a valid certificate") raise forms.ValidationError(msg) self.signer_init() if not self._s: msg = _("Is not a valid certificate") raise forms.ValidationError(msg) self.new_did() return data def new_did(self): keys = { "cert": base64.b64encode(self.pfx_file).decode('utf-8'), "passphrase": self._pss } key_material = json.dumps(keys) self._did = DID( key_material=key_material, did=self.file_name, label=self._label, eidas1=True, type=DID.Types.KEY ) self._did.set_key_material(key_material) def save(self, commit=True): if commit: self._did.save() return self._did return def signer_init(self): self._s = certs.load_cert( self.pfx_file, self._pss.encode('utf-8') )