Merge pull request 'api_v1' (#15) from api_v1 into main

Reviewed-on: #15
This commit is contained in:
cayop 2024-10-24 10:00:10 +00:00
commit 1a2f1249ff
4 changed files with 216 additions and 76 deletions

View File

@ -6,9 +6,11 @@ from django.urls import path
app_name = 'api' app_name = 'api'
urlpatterns = [ urlpatterns = [
path('snapshot/', views.NewSnapshot, name='new_snapshot'), path('v1/snapshot/', views.NewSnapshotView.as_view(), name='new_snapshot'),
path('tokens/', views.TokenView.as_view(), name='tokens'), path('v1/annotation/<str:pk>/', views.AddAnnotationView.as_view(), name='new_annotation'),
path('tokens/new', views.TokenNewView.as_view(), name='new_token'), path('v1/device/<str:pk>/', views.DetailsDeviceView.as_view(), name='device'),
path("tokens/<int:pk>/edit", views.EditTokenView.as_view(), name="edit_token"), path('v1/tokens/', views.TokenView.as_view(), name='tokens'),
path('tokens/<int:pk>/del', views.TokenDeleteView.as_view(), name='delete_token'), path('v1/tokens/new', views.TokenNewView.as_view(), name='new_token'),
path("v1/tokens/<int:pk>/edit", views.EditTokenView.as_view(), name="edit_token"),
path('v1/tokens/<int:pk>/del', views.TokenDeleteView.as_view(), name='delete_token'),
] ]

View File

@ -1,4 +1,6 @@
import json import json
import uuid
import logging
from uuid import uuid4 from uuid import uuid4
@ -7,6 +9,7 @@ from django.http import JsonResponse
from django.shortcuts import get_object_or_404, redirect from django.shortcuts import get_object_or_404, redirect
from django.utils.translation import gettext_lazy as _ from django.utils.translation import gettext_lazy as _
from django.views.decorators.csrf import csrf_exempt from django.views.decorators.csrf import csrf_exempt
from django.utils.decorators import method_decorator
from django_tables2 import SingleTableView from django_tables2 import SingleTableView
from django.views.generic.edit import ( from django.views.generic.edit import (
CreateView, CreateView,
@ -15,81 +18,118 @@ from django.views.generic.edit import (
) )
from utils.save_snapshots import move_json, save_in_disk from utils.save_snapshots import move_json, save_in_disk
from django.views.generic.edit import View
from dashboard.mixins import DashboardView from dashboard.mixins import DashboardView
from evidence.models import Annotation from evidence.models import Annotation
from evidence.parse_details import ParseSnapshot
from evidence.parse import Build from evidence.parse import Build
from device.models import Device
from api.models import Token from api.models import Token
from api.tables import TokensTable from api.tables import TokensTable
@csrf_exempt logger = logging.getLogger('django')
def NewSnapshot(request):
# Accept only posts
if request.method != 'POST':
return JsonResponse({'error': 'Invalid request method'}, status=400)
# Authentication
auth_header = request.headers.get('Authorization')
if not auth_header or not auth_header.startswith('Bearer '):
return JsonResponse({'error': 'Invalid or missing token'}, status=401)
token = auth_header.split(' ')[1]
tk = Token.objects.filter(token=token).first()
if not tk:
return JsonResponse({'error': 'Invalid or missing token'}, status=401)
# Validation snapshot
try:
data = json.loads(request.body)
except json.JSONDecodeError:
return JsonResponse({'error': 'Invalid JSON'}, status=400)
# try:
# Build(data, None, check=True)
# except Exception:
# return JsonResponse({'error': 'Invalid Snapshot'}, status=400)
exist_annotation = Annotation.objects.filter(
uuid=data['uuid']
).first()
if exist_annotation:
txt = "error: the snapshot {} exist".format(data['uuid'])
return JsonResponse({'status': txt}, status=500)
# Process snapshot
path_name = save_in_disk(data, tk.owner.institution.name)
try:
Build(data, tk.owner)
except Exception as err:
return JsonResponse({'status': f"fail: {err}"}, status=500)
annotation = Annotation.objects.filter(
uuid=data['uuid'],
type=Annotation.Type.SYSTEM,
# TODO this is hardcoded, it should select the user preferred algorithm
key="hidalgo1",
owner=tk.owner.institution
).first()
if not annotation: class ApiMixing(View):
return JsonResponse({'status': 'fail'}, status=500)
url_args = reverse_lazy("device:details", args=(annotation.value,)) @method_decorator(csrf_exempt)
url = request.build_absolute_uri(url_args) def dispatch(self, *args, **kwargs):
return super().dispatch(*args, **kwargs)
response = { def auth(self):
"status": "success", # Authentication
"dhid": annotation.value[:6].upper(), auth_header = self.request.headers.get('Authorization')
"url": url, if not auth_header or not auth_header.startswith('Bearer '):
# TODO replace with public_url when available logger.exception("Invalid or missing token {}".format(auth_header))
"public_url": url return JsonResponse({'error': 'Invalid or missing token'}, status=401)
}
move_json(path_name, tk.owner.institution.name)
return JsonResponse(response, status=200) token = auth_header.split(' ')[1].strip("'").strip('"')
try:
uuid.UUID(token)
except Exception:
logger.exception("Invalid token {}".format(token))
return JsonResponse({'error': 'Invalid or missing token'}, status=401)
self.tk = Token.objects.filter(token=token).first()
if not self.tk:
logger.exception("Invalid or missing token {}".format(token))
return JsonResponse({'error': 'Invalid or missing token'}, status=401)
class NewSnapshotView(ApiMixing):
def get(self, request, *args, **kwargs):
return JsonResponse({}, status=404)
def post(self, request, *args, **kwargs):
response = self.auth()
if response:
return response
# Validation snapshot
try:
data = json.loads(request.body)
except json.JSONDecodeError:
logger.exception("Invalid Snapshot of user {}".format(self.tk.owner))
return JsonResponse({'error': 'Invalid JSON'}, status=500)
# Process snapshot
path_name = save_in_disk(data, self.tk.owner.institution.name)
# try:
# Build(data, None, check=True)
# except Exception:
# return JsonResponse({'error': 'Invalid Snapshot'}, status=400)
if not data.get("uuid"):
txt = "error: the snapshot not have uuid"
logger.exception(txt)
return JsonResponse({'status': txt}, status=500)
exist_annotation = Annotation.objects.filter(
uuid=data['uuid']
).first()
if exist_annotation:
txt = "error: the snapshot {} exist".format(data['uuid'])
logger.exception(txt)
return JsonResponse({'status': txt}, status=500)
try:
Build(data, self.tk.owner)
except Exception as err:
logger.exception(err)
return JsonResponse({'status': f"fail: {err}"}, status=500)
annotation = Annotation.objects.filter(
uuid=data['uuid'],
type=Annotation.Type.SYSTEM,
# TODO this is hardcoded, it should select the user preferred algorithm
key="hidalgo1",
owner=self.tk.owner.institution
).first()
if not annotation:
logger.exception("Error: No annotation for uuid: {}".format(data["uuid"]))
return JsonResponse({'status': 'fail'}, status=500)
url_args = reverse_lazy("device:details", args=(annotation.value,))
url = request.build_absolute_uri(url_args)
response = {
"status": "success",
"dhid": annotation.value[:6].upper(),
"url": url,
# TODO replace with public_url when available
"public_url": url
}
move_json(path_name, self.tk.owner.institution.name)
return JsonResponse(response, status=200)
class TokenView(DashboardView, SingleTableView): class TokenView(DashboardView, SingleTableView):
@ -165,3 +205,99 @@ class EditTokenView(DashboardView, UpdateView):
) )
kwargs = super().get_form_kwargs() kwargs = super().get_form_kwargs()
return kwargs return kwargs
class DetailsDeviceView(ApiMixing):
def get(self, request, *args, **kwargs):
response = self.auth()
if response:
return response
self.pk = kwargs['pk']
self.object = Device(id=self.pk)
if not self.object.last_evidence:
return JsonResponse({}, status=404)
if self.object.owner != self.tk.owner.institution:
return JsonResponse({}, status=403)
data = self.get_data()
return JsonResponse(data, status=200)
def post(self, request, *args, **kwargs):
return JsonResponse({}, status=404)
def get_data(self):
data = {}
self.object.initial()
self.object.get_last_evidence()
evidence = self.object.last_evidence
if evidence.is_legacy():
data.update({
"device": evidence.get("device"),
"components": evidence.get("components"),
})
else:
evidence.get_doc()
snapshot = ParseSnapshot(evidence.doc).snapshot_json
data.update({
"device": snapshot.get("device"),
"components": snapshot.get("components"),
})
uuids = Annotation.objects.filter(
owner=self.tk.owner.institution,
value=self.pk
).values("uuid")
annotations = Annotation.objects.filter(
uuid__in=uuids,
owner=self.tk.owner.institution,
type = Annotation.Type.USER
).values_list("key", "value")
data.update({"annotations": list(annotations)})
return data
class AddAnnotationView(ApiMixing):
def post(self, request, *args, **kwargs):
response = self.auth()
if response:
return response
self.pk = kwargs['pk']
institution = self.tk.owner.institution
self.annotation = Annotation.objects.filter(
owner=institution,
value=self.pk,
type=Annotation.Type.SYSTEM
).first()
if not self.annotation:
return JsonResponse({}, status=404)
try:
data = json.loads(request.body)
key = data["key"]
value = data["value"]
except Exception:
logger.exception("Invalid Snapshot of user {}".format(self.tk.owner))
return JsonResponse({'error': 'Invalid JSON'}, status=500)
Annotation.objects.create(
uuid=self.annotation.uuid,
owner=self.tk.owner.institution,
type = Annotation.Type.USER,
key = key,
value = value
)
return JsonResponse({"status": "success"}, status=200)
def get(self, request, *args, **kwargs):
return JsonResponse({}, status=404)

View File

@ -1,8 +1,7 @@
from django.db import models, connection from django.db import models, connection
from utils.constants import STR_SM_SIZE, STR_SIZE, STR_EXTEND_SIZE, ALGOS from utils.constants import ALGOS
from evidence.models import Annotation, Evidence from evidence.models import Annotation, Evidence
from user.models import User
from lot.models import DeviceLot from lot.models import DeviceLot

View File

@ -3,7 +3,7 @@ import json
from dmidecode import DMIParse from dmidecode import DMIParse
from django.db import models from django.db import models
from utils.constants import STR_SM_SIZE, STR_EXTEND_SIZE, CHASSIS_DH from utils.constants import STR_EXTEND_SIZE, CHASSIS_DH
from evidence.xapian import search from evidence.xapian import search
from evidence.parse_details import ParseSnapshot from evidence.parse_details import ParseSnapshot
from user.models import User, Institution from user.models import User, Institution
@ -67,7 +67,7 @@ class Evidence:
for xa in matches: for xa in matches:
self.doc = json.loads(xa.document.get_data()) self.doc = json.loads(xa.document.get_data())
if self.doc.get("software") == "workbench-script": if not self.is_legacy():
dmidecode_raw = self.doc["data"]["dmidecode"] dmidecode_raw = self.doc["data"]["dmidecode"]
self.dmi = DMIParse(dmidecode_raw) self.dmi = DMIParse(dmidecode_raw)
@ -80,7 +80,7 @@ class Evidence:
self.created = self.annotations.last().created self.created = self.annotations.last().created
def get_components(self): def get_components(self):
if self.doc.get("software") != "workbench-script": if self.is_legacy():
return self.doc.get('components', []) return self.doc.get('components', [])
self.set_components() self.set_components()
return self.components return self.components
@ -92,7 +92,7 @@ class Evidence:
return "" return ""
return list(self.doc.get('kv').values())[0] return list(self.doc.get('kv').values())[0]
if self.doc.get("software") != "workbench-script": if self.is_legacy():
return self.doc['device']['manufacturer'] return self.doc['device']['manufacturer']
return self.dmi.manufacturer().strip() return self.dmi.manufacturer().strip()
@ -104,13 +104,13 @@ class Evidence:
return "" return ""
return list(self.doc.get('kv').values())[1] return list(self.doc.get('kv').values())[1]
if self.doc.get("software") != "workbench-script": if self.is_legacy():
return self.doc['device']['model'] return self.doc['device']['model']
return self.dmi.model().strip() return self.dmi.model().strip()
def get_chassis(self): def get_chassis(self):
if self.doc.get("software") != "workbench-script": if self.is_legacy():
return self.doc['device']['model'] return self.doc['device']['model']
chassis = self.dmi.get("Chassis")[0].get("Type", '_virtual') chassis = self.dmi.get("Chassis")[0].get("Type", '_virtual')
@ -132,3 +132,6 @@ class Evidence:
def set_components(self): def set_components(self):
snapshot = ParseSnapshot(self.doc).snapshot_json snapshot = ParseSnapshot(self.doc).snapshot_json
self.components = snapshot['components'] self.components = snapshot['components']
def is_legacy(self):
return self.doc.get("software") != "workbench-script"