add commands for setup to dlt

This commit is contained in:
Cayo Puigdefabregas 2024-11-19 21:17:48 +01:00
parent fb836edfbb
commit f6d1cf719c
7 changed files with 221 additions and 21 deletions

View File

@ -245,3 +245,4 @@ COMMIT = config('COMMIT', default='')
# DLT SETTINGS # DLT SETTINGS
TOKEN_DLT = config("TOKEN_DLT", default=None) TOKEN_DLT = config("TOKEN_DLT", default=None)
API_DLT = config("API_DLT", default=None) API_DLT = config("API_DLT", default=None)
API_RESULVER = config("API_RESOLVER", default=None)

View File

@ -1,10 +1,11 @@
import json
import time import time
import logging import logging
from django.conf import settings from django.conf import settings
from ereuseapi.methods import API from ereuseapi.methods import API
from dpp.models import Proof from dpp.models import Proof, UserDpp
logger = logging.getLogger('django') logger = logging.getLogger('django')
@ -29,22 +30,22 @@ PROOF_TYPE = {
} }
def connect_api(): def connect_api(user):
if not settings.TOKEN_DLT: dp = UserDpp.objects.filter(user=user).first()
if not dp:
return return
token_dlt = settings.TOKEN_DLT
api_dlt = settings.API_DLT api_dlt = settings.API_DLT
token_dlt = json.loads(dp).get("token_dlt")
if not api_dlt or not token_dlt:
logger.error("NOT POSSIBLE CONNECT WITH API DLT!!!")
return
return API(api_dlt, token_dlt, "ethereum") return API(api_dlt, token_dlt, "ethereum")
def register_dlt(chid, phid, proof_type=None): def register_dlt(api, chid, phid, proof_type=None):
api = connect_api()
if not api:
return
if proof_type: if proof_type:
return api.generate_proof( return api.generate_proof(
chid, chid,
@ -62,11 +63,8 @@ def register_dlt(chid, phid, proof_type=None):
) )
def issuer_dpp_dlt(dpp): def issuer_dpp_dlt(api, dpp):
phid = dpp.split(":")[0] phid = dpp.split(":")[0]
api = connect_api()
if not api:
return
return api.issue_passport( return api.issue_passport(
dpp, dpp,
@ -96,14 +94,14 @@ def save_proof(signature, ev_uuid, result, proof_type, user):
def register_device_dlt(chid, phid, ev_uuid, user): def register_device_dlt(chid, phid, ev_uuid, user):
token_dlt = settings.TOKEN_DLT
api_dlt = settings.API_DLT
if not token_dlt or not api_dlt:
return
cny_a = 1 cny_a = 1
while cny_a: while cny_a:
result = register_dlt(chid, phid) api = connect_api(user)
if not api:
cny_a = 0
return
result = register_dlt(api, chid, phid)
try: try:
assert result['Status'] == STATUS_CODE.get("Success") assert result['Status'] == STATUS_CODE.get("Success")
assert result['Data']['data']['timestamp'] assert result['Data']['data']['timestamp']
@ -148,7 +146,12 @@ def register_passport_dlt(chid, phid, ev_uuid, user):
cny_a = 1 cny_a = 1
while cny_a: while cny_a:
try: try:
result = issuer_dpp_dlt(dpp) api = connect_api(user)
if not api:
cny_a = 0
return
result = issuer_dpp_dlt(api, dpp)
cny_a = 0 cny_a = 0
except Exception as err: except Exception as err:
logger.error("ERROR API issue passport return: %s", err) logger.error("ERROR API issue passport return: %s", err)

View File

@ -0,0 +1,35 @@
import logging
import requests
from django.core.management.base import BaseCommand
from django.conf import settings
from user.models import Institution
logger = logging.getLogger('django')
class Command(BaseCommand):
help = "Insert a new Institution in DLT"
def add_arguments(self, parser):
parser.add_argument('domain', type=str, help='institution')
def handle(self, *args, **kwargs):
domain = kwargs.get("domain")
api = settings.API_RESOLVER
if not api
logger.error("you need set the var API_RESOLVER")
return
if "http" not in domain:
logger.error("you need put https:// in %s", domain)
return
api = api.strip("/")
domain = domain.strip("/")
data = {"url": domain}
url = api + '/registerURL'
res = requests.post(url, json=data)
print(res.json())

View File

@ -0,0 +1,72 @@
import json
import logging
from ereuseapi.methods import API
from django.conf import settings
from django.core.management.base import BaseCommand
from user.models import User, Institution
from dpp.models import UserDpp
logger = logging.getLogger('django')
class Command(BaseCommand):
help = "Insert users than are in Dlt with params: path of data set file"
def add_arguments(self, parser):
parser.add_argument('dataset_file', type=str, help='institution')
def handle(self, *args, **kwargs):
dataset_file = kwargs.get("dataset_file")
self.api_dlt = settings.API_DLT
self.institution = Institution.objects.filter().first()
if not self.api_dlt:
logger.error("you need set the var API_DLT")
return
self.api_dlt = self.api_dlt.strip("/")
with open(dataset_file) as f:
dataset = json.loads(f.read())
self.add_user(dataset)
def add_user(self, data):
email = data.get("email")
password = data.get("password")
api_token = data.get("api_token")
# ethereum = {"data": {"api_token": api_token}}
# data_eth = json.dumps(ethereum)
data_eth = json.dumps(api_token)
# TODO encrypt in the future
# api_keys_dlt = encrypt(password, data_eth)
api_keys_dlt = data_eth
user = User.objects.filter(email=email).first()
if not user:
user = User.objects.create(
email=email,
password=password,
institution = self.institution
)
roles = []
token_dlt = api_token
api = API(self.api_dlt, token_dlt, "ethereum")
result = api.check_user_roles()
if result.get('Status') == 200:
if 'Success' in result.get('Data', {}).get('status'):
rols = result.get('Data', {}).get('data', {})
roles = [(k, k) for k, v in rols.items() if v]
roles_dlt = json.dumps(roles)
UserDpp.objects.create(
roles_dlt=roles_dlt,
api_keys_dlt=api_keys_dlt,
user=user
)

View File

@ -0,0 +1,47 @@
import logging
import requests
from django.core.management.base import BaseCommand
from django.conf import settings
from dpp.models import MemberFederated
logger = logging.getLogger('django')
class Command(BaseCommand):
help = "Synchronize members of DLT"
def handle(self, *args, **kwargs):
api = settings.API_RESOLVER
if not api
logger.error("you need set the var API_RESOLVER")
return
api = api.strip("/")
url = api + '/getAll'
res = requests.get(url)
if res.status_code != 200:
return "Error, {}".format(res.text)
response = res.json()
members = response['url']
counter = members.pop('counter')
if counter <= MemberFederated.objects.count():
logger.info("Synchronize members of DLT -> All Ok")
return "All ok"
for k, v in members.items():
id = self.clean_id(k)
member = MemberFederated.objects.filter(dlt_id_provider=id).first()
if member:
if member.domain != v:
member.domain = v
member.save()
continue
MemberFederated.objects.create(dlt_id_provider=id, domain=v)
return res.text
def clean_id(self, id):
return int(id.split('DH')[-1])

View File

@ -0,0 +1,25 @@
# Generated by Django 5.0.6 on 2024-11-19 19:18
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
("dpp", "0001_initial"),
]
operations = [
migrations.CreateModel(
name="MemberFederated",
fields=[
(
"dlt_id_provider",
models.IntegerField(primary_key=True, serialize=False),
),
("domain", models.CharField(max_length=256)),
("client_id", models.CharField(max_length=256)),
("client_secret", models.CharField(max_length=256)),
],
),
]

View File

@ -13,3 +13,20 @@ class Proof(models.Model):
issuer = models.ForeignKey(Institution, on_delete=models.CASCADE) issuer = models.ForeignKey(Institution, on_delete=models.CASCADE)
user = models.ForeignKey( user = models.ForeignKey(
User, on_delete=models.SET_NULL, null=True, blank=True) User, on_delete=models.SET_NULL, null=True, blank=True)
class MemberFederated(models.Model):
dlt_id_provider = models.IntegerField(primary_key=True)
domain = models.CharField(max_length=STR_EXTEND_SIZE)
# This client_id and client_secret is used for connected to this domain as
# a client and this domain then is the server of auth
client_id = models.CharField(max_length=STR_EXTEND_SIZE. null=True)
client_secret = models.CharField(max_length=STR_EXTEND_SIZE, null=True)
institution = models.ForeignKey(
Institution, on_delete=models.SET_NULL, null=True, blank=True)
class UserDpp(models.Model):
roles_dlt = models.TextField()
api_keys_dlt = models.TextField()
user = models.ForeignKey(User, on_delete=models.CASCADE)