From 26589b1ee5e2476f77c29e88d64ea8219ad11f05 Mon Sep 17 00:00:00 2001 From: Cayo Puigdefabregas Date: Mon, 1 Jul 2024 12:17:23 +0200 Subject: [PATCH] modify snapshots --- snapshot/models.py | 1 + snapshot/parse.py | 563 ++++++++++++++++++++++++++++++++++++++++ snapshot/serializers.py | 39 ++- snapshot/views.py | 12 +- 4 files changed, 607 insertions(+), 8 deletions(-) create mode 100644 snapshot/parse.py diff --git a/snapshot/models.py b/snapshot/models.py index 6bb81ce..252cb98 100644 --- a/snapshot/models.py +++ b/snapshot/models.py @@ -29,3 +29,4 @@ class Snapshot(models.Model): owner = models.ForeignKey(User, on_delete=models.CASCADE) computer = models.ForeignKey(Computer, on_delete=models.CASCADE) components = models.ManyToManyField(Component) + diff --git a/snapshot/parse.py b/snapshot/parse.py new file mode 100644 index 0000000..403afc3 --- /dev/null +++ b/snapshot/parse.py @@ -0,0 +1,563 @@ +# from dmidecode import DMIParse + + +class Parse: + def __init__(self, snapshot_json): + self.json = snapshot_json + + +# class ParseSnapshot: +# def __init__(self, snapshot, default="n/a"): +# self.default = default +# self.dmidecode_raw = snapshot["hwmd"]["dmidecode"] +# self.smart_raw = snapshot["hwmd"]["smart"] +# self.hwinfo_raw = snapshot["hwmd"]["hwinfo"] +# self.lshw_raw = snapshot["hwmd"]["lshw"] +# self.lscpi_raw = snapshot["hwmd"]["lspci"] +# self.sanitize_raw = snapshot.get("sanitize", []) +# self.device = {"actions": []} +# self.components = [] +# self.monitors = [] + +# self.dmi = DMIParse(self.dmidecode_raw) +# self.smart = self.loads(self.smart_raw) +# self.lshw = self.loads(self.lshw_raw) +# self.hwinfo = self.parse_hwinfo() + +# self.set_computer() +# self.get_hwinfo_monitors() +# self.set_components() +# self.snapshot_json = { +# "type": "Snapshot", +# "device": self.device, +# "software": "Workbench", +# # "software": snapshot["software"], +# "components": self.components, +# "uuid": snapshot['uuid'], +# "version": "15.0.0", +# # "version": snapshot['version'], +# "settings_version": snapshot['settings_version'], +# "endTime": snapshot["timestamp"], +# "elapsed": 1, +# "sid": snapshot["sid"], +# } + +# def get_snapshot(self): +# return Snapshot().load(self.snapshot_json) + +# def set_computer(self): +# self.device['manufacturer'] = self.dmi.manufacturer().strip() +# self.device['model'] = self.dmi.model().strip() +# self.device['serialNumber'] = self.dmi.serial_number() +# self.device['type'] = self.get_type() +# self.device['sku'] = self.get_sku() +# self.device['version'] = self.get_version() +# self.device['system_uuid'] = self.get_uuid() +# self.device['family'] = self.get_family() +# self.device['chassis'] = self.get_chassis_dh() + +# def set_components(self): +# self.get_cpu() +# self.get_ram() +# self.get_mother_board() +# self.get_graphic() +# self.get_data_storage() +# self.get_display() +# self.get_sound_card() +# self.get_networks() + +# def get_cpu(self): +# for cpu in self.dmi.get('Processor'): +# serial = cpu.get('Serial Number') +# if serial == 'Not Specified' or not serial: +# serial = cpu.get('ID').replace(' ', '') +# self.components.append( +# { +# "actions": [], +# "type": "Processor", +# "speed": self.get_cpu_speed(cpu), +# "cores": int(cpu.get('Core Count', 1)), +# "model": cpu.get('Version'), +# "threads": int(cpu.get('Thread Count', 1)), +# "manufacturer": cpu.get('Manufacturer'), +# "serialNumber": serial, +# "generation": None, +# "brand": cpu.get('Family'), +# "address": self.get_cpu_address(cpu), +# } +# ) + +# def get_ram(self): +# for ram in self.dmi.get("Memory Device"): +# if ram.get('size') == 'No Module Installed': +# continue +# if not ram.get("Speed"): +# continue + +# self.components.append( +# { +# "actions": [], +# "type": "RamModule", +# "size": self.get_ram_size(ram), +# "speed": self.get_ram_speed(ram), +# "manufacturer": ram.get("Manufacturer", self.default), +# "serialNumber": ram.get("Serial Number", self.default), +# "interface": ram.get("Type", "DDR"), +# "format": ram.get("Form Factor", "DIMM"), +# "model": ram.get("Part Number", self.default), +# } +# ) + +# def get_mother_board(self): +# for moder_board in self.dmi.get("Baseboard"): +# self.components.append( +# { +# "actions": [], +# "type": "Motherboard", +# "version": moder_board.get("Version"), +# "serialNumber": moder_board.get("Serial Number", "").strip(), +# "manufacturer": moder_board.get("Manufacturer", "").strip(), +# "biosDate": self.get_bios_date(), +# "ramMaxSize": self.get_max_ram_size(), +# "ramSlots": len(self.dmi.get("Memory Device")), +# "slots": self.get_ram_slots(), +# "model": moder_board.get("Product Name", "").strip(), +# "firewire": self.get_firmware_num(), +# "pcmcia": self.get_pcmcia_num(), +# "serial": self.get_serial_num(), +# "usb": self.get_usb_num(), +# } +# ) + +# def get_graphic(self): +# nodes = get_nested_dicts_with_key_value(self.lshw, 'class', 'display') +# for c in nodes: +# if not c['configuration'].get('driver', None): +# continue + +# self.components.append( +# { +# "actions": [], +# "type": "GraphicCard", +# "memory": self.get_memory_video(c), +# "manufacturer": c.get("vendor", self.default), +# "model": c.get("product", self.default), +# "serialNumber": c.get("serial", self.default), +# } +# ) + +# def get_memory_video(self, c): +# # get info of lspci +# # pci_id = c['businfo'].split('@')[1] +# # lspci.get(pci_id) | grep size +# # lspci -v -s 00:02.0 +# return None + +# def get_data_storage(self): +# for sm in self.smart: +# if sm.get('smartctl', {}).get('exit_status') == 1: +# continue +# model = sm.get('model_name') +# manufacturer = None +# if model and len(model.split(" ")) > 1: +# mm = model.split(" ") +# model = mm[-1] +# manufacturer = " ".join(mm[:-1]) + +# self.components.append( +# { +# "actions": self.sanitize(sm), +# "type": self.get_data_storage_type(sm), +# "model": model, +# "manufacturer": manufacturer, +# "serialNumber": sm.get('serial_number'), +# "size": self.get_data_storage_size(sm), +# "variant": sm.get("firmware_version"), +# "interface": self.get_data_storage_interface(sm), +# } +# ) + +# def sanitize(self, disk): +# disk_sanitize = None +# for d in self.sanitize_raw: +# s = d.get('device_info', {}).get('export_data', {}) +# s = s.get('block', {}).get('serial') +# if s == disk.get('serial_number'): +# disk_sanitize = d +# break +# if not disk_sanitize: +# return [] + +# steps = [] +# step_type = 'EraseBasic' +# if d.get("method", {}).get('name') == 'Baseline Cryptographic': +# step_type = 'EraseCrypto' + +# if disk.get('type') == 'EraseCrypto': +# step_type = 'EraseCrypto' + +# erase = { +# 'type': step_type, +# 'severity': "Info", +# 'steps': steps, +# 'startTime': None, +# 'endTime': None, +# } +# severities = [] + +# for step in disk_sanitize.get('steps', []): +# severity = "Info" +# if not step['success']: +# severity = "Error" + +# steps.append( +# { +# 'severity': severity, +# 'startTime': unix_isoformat(step['start_time']), +# 'endTime': unix_isoformat(step['end_time']), +# 'type': 'StepRandom', +# } +# ) +# severities.append(severity) + +# erase['endTime'] = unix_isoformat(step['end_time']) +# if not erase['startTime']: +# erase['startTime'] = unix_isoformat(step['start_time']) + +# if "Error" in severities: +# erase['severity'] = "Error" + +# return [erase] + +# def get_networks(self): +# nodes = get_nested_dicts_with_key_value(self.lshw, 'class', 'network') +# for c in nodes: +# capacity = c.get('capacity') +# units = c.get('units') +# speed = None +# if capacity and units: +# speed = unit.Quantity(capacity, units).to('Mbit/s').m +# wireless = bool(c.get('configuration', {}).get('wireless', False)) +# self.components.append( +# { +# "actions": [], +# "type": "NetworkAdapter", +# "model": c.get('product'), +# "manufacturer": c.get('vendor'), +# "serialNumber": c.get('serial'), +# "speed": speed, +# "variant": c.get('version', 1), +# "wireless": wireless, +# } +# ) + +# def get_sound_card(self): +# nodes = get_nested_dicts_with_key_value(self.lshw, 'class', 'multimedia') +# for c in nodes: +# self.components.append( +# { +# "actions": [], +# "type": "SoundCard", +# "model": c.get('product'), +# "manufacturer": c.get('vendor'), +# "serialNumber": c.get('serial'), +# } +# ) + +# def get_display(self): # noqa: C901 +# TECHS = 'CRT', 'TFT', 'LED', 'PDP', 'LCD', 'OLED', 'AMOLED' + +# for c in self.monitors: +# resolution_width, resolution_height = (None,) * 2 +# refresh, serial, model, manufacturer, size = (None,) * 5 +# year, week, production_date = (None,) * 3 + +# for x in c: +# if "Vendor: " in x: +# manufacturer = x.split('Vendor: ')[-1].strip() +# if "Model: " in x: +# model = x.split('Model: ')[-1].strip() +# if "Serial ID: " in x: +# serial = x.split('Serial ID: ')[-1].strip() +# if " Resolution: " in x: +# rs = x.split(' Resolution: ')[-1].strip() +# if 'x' in rs: +# resolution_width, resolution_height = [ +# int(r) for r in rs.split('x') +# ] +# if "Frequencies: " in x: +# try: +# refresh = int(float(x.split(',')[-1].strip()[:-3])) +# except Exception: +# pass +# if 'Year of Manufacture' in x: +# year = x.split(': ')[1] + +# if 'Week of Manufacture' in x: +# week = x.split(': ')[1] + +# if "Size: " in x: +# size = self.get_size_monitor(x) +# technology = next((t for t in TECHS if t in c[0]), None) + +# if year and week: +# d = '{} {} 0'.format(year, week) +# production_date = datetime.strptime(d, '%Y %W %w').isoformat() + +# self.components.append( +# { +# "actions": [], +# "type": "Display", +# "model": model, +# "manufacturer": manufacturer, +# "serialNumber": serial, +# 'size': size, +# 'resolutionWidth': resolution_width, +# 'resolutionHeight': resolution_height, +# "productionDate": production_date, +# 'technology': technology, +# 'refreshRate': refresh, +# } +# ) + +# def get_hwinfo_monitors(self): +# for c in self.hwinfo: +# monitor = None +# external = None +# for x in c: +# if 'Hardware Class: monitor' in x: +# monitor = c +# if 'Driver Info' in x: +# external = c + +# if monitor and not external: +# self.monitors.append(c) + +# def get_size_monitor(self, x): +# i = 1 / 25.4 +# t = x.split('Size: ')[-1].strip() +# tt = t.split('mm') +# if not tt: +# return 0 +# sizes = tt[0].strip() +# if 'x' not in sizes: +# return 0 +# w, h = [int(x) for x in sizes.split('x')] +# return numpy.sqrt(w**2 + h**2) * i + +# def get_cpu_address(self, cpu): +# default = 64 +# for ch in self.lshw.get('children', []): +# for c in ch.get('children', []): +# if c['class'] == 'processor': +# return c.get('width', default) +# return default + +# def get_usb_num(self): +# return len( +# [ +# u +# for u in self.dmi.get("Port Connector") +# if "USB" in u.get("Port Type", "").upper() +# ] +# ) + +# def get_serial_num(self): +# return len( +# [ +# u +# for u in self.dmi.get("Port Connector") +# if "SERIAL" in u.get("Port Type", "").upper() +# ] +# ) + +# def get_firmware_num(self): +# return len( +# [ +# u +# for u in self.dmi.get("Port Connector") +# if "FIRMWARE" in u.get("Port Type", "").upper() +# ] +# ) + +# def get_pcmcia_num(self): +# return len( +# [ +# u +# for u in self.dmi.get("Port Connector") +# if "PCMCIA" in u.get("Port Type", "").upper() +# ] +# ) + +# def get_bios_date(self): +# return self.dmi.get("BIOS")[0].get("Release Date", self.default) + +# def get_firmware(self): +# return self.dmi.get("BIOS")[0].get("Firmware Revision", '1') + +# def get_max_ram_size(self): +# size = 0 +# for slot in self.dmi.get("Physical Memory Array"): +# capacity = slot.get("Maximum Capacity", '0').split(" ")[0] +# size += int(capacity) + +# return size + +# def get_ram_slots(self): +# slots = 0 +# for x in self.dmi.get("Physical Memory Array"): +# slots += int(x.get("Number Of Devices", 0)) +# return slots + +# def get_ram_size(self, ram): +# try: +# memory = ram.get("Size", "0") +# memory = memory.split(' ') +# if len(memory) > 1: +# size = int(memory[0]) +# units = memory[1] +# return base2.Quantity(size, units).to('MiB').m +# return int(size.split(" ")[0]) +# except Exception as err: +# logger.error("get_ram_size error: {}".format(err)) +# return 0 + +# def get_ram_speed(self, ram): +# size = ram.get("Speed", "0") +# return int(size.split(" ")[0]) + +# def get_cpu_speed(self, cpu): +# speed = cpu.get('Max Speed', "0") +# return float(speed.split(" ")[0]) / 1024 + +# def get_sku(self): +# return self.dmi.get("System")[0].get("SKU Number", self.default).strip() + +# def get_version(self): +# return self.dmi.get("System")[0].get("Version", self.default).strip() + +# def get_uuid(self): +# return self.dmi.get("System")[0].get("UUID", '').strip() + +# def get_family(self): +# return self.dmi.get("System")[0].get("Family", '') + +# def get_chassis(self): +# return self.dmi.get("Chassis")[0].get("Type", '_virtual') + +# def get_type(self): +# chassis_type = self.get_chassis() +# return self.translation_to_devicehub(chassis_type) + +# def translation_to_devicehub(self, original_type): +# lower_type = original_type.lower() +# CHASSIS_TYPE = { +# 'Desktop': [ +# 'desktop', +# 'low-profile', +# 'tower', +# 'docking', +# 'all-in-one', +# 'pizzabox', +# 'mini-tower', +# 'space-saving', +# 'lunchbox', +# 'mini', +# 'stick', +# ], +# 'Laptop': [ +# 'portable', +# 'laptop', +# 'convertible', +# 'tablet', +# 'detachable', +# 'notebook', +# 'handheld', +# 'sub-notebook', +# ], +# 'Server': ['server'], +# 'Computer': ['_virtual'], +# } +# for k, v in CHASSIS_TYPE.items(): +# if lower_type in v: +# return k +# return self.default + +# def get_chassis_dh(self): +# CHASSIS_DH = { +# 'Tower': {'desktop', 'low-profile', 'tower', 'server'}, +# 'Docking': {'docking'}, +# 'AllInOne': {'all-in-one'}, +# 'Microtower': {'mini-tower', 'space-saving', 'mini'}, +# 'PizzaBox': {'pizzabox'}, +# 'Lunchbox': {'lunchbox'}, +# 'Stick': {'stick'}, +# 'Netbook': {'notebook', 'sub-notebook'}, +# 'Handheld': {'handheld'}, +# 'Laptop': {'portable', 'laptop'}, +# 'Convertible': {'convertible'}, +# 'Detachable': {'detachable'}, +# 'Tablet': {'tablet'}, +# 'Virtual': {'_virtual'}, +# } + +# chassis = self.get_chassis() +# lower_type = chassis.lower() +# for k, v in CHASSIS_DH.items(): +# if lower_type in v: +# return k +# return self.default + +# def get_data_storage_type(self, x): +# # TODO @cayop add more SSDS types +# SSDS = ["nvme"] +# SSD = 'SolidStateDrive' +# HDD = 'HardDrive' +# type_dev = x.get('device', {}).get('type') +# trim = x.get('trim', {}).get("supported") in [True, "true"] +# return SSD if type_dev in SSDS or trim else HDD + +# def get_data_storage_interface(self, x): +# interface = x.get('device', {}).get('protocol', 'ATA') +# try: +# DataStorageInterface(interface.upper()) +# except ValueError as err: +# txt = "Sid: {}, interface {} is not in DataStorageInterface Enum".format( +# self.sid, interface +# ) +# self.errors("{}".format(err)) +# self.errors(txt, severity=Severity.Warning) +# return "ATA" + +# def get_data_storage_size(self, x): +# total_capacity = x.get('user_capacity', {}).get('bytes') +# if not total_capacity: +# return 1 +# # convert bytes to Mb +# return total_capacity / 1024**2 + +# def parse_hwinfo(self): +# hw_blocks = self.hwinfo_raw.split("\n\n") +# return [x.split("\n") for x in hw_blocks] + +# def loads(self, x): +# if isinstance(x, str): +# return json.loads(x) +# return x + +# def errors(self, txt=None, severity=Severity.Error): +# if not txt: +# return self._errors + +# logger.error(txt) +# self._errors.append(txt) +# error = SnapshotsLog( +# description=txt, +# snapshot_uuid=self.uuid, +# severity=severity, +# sid=self.sid, +# version=self.version, +# ) +# error.save() + + diff --git a/snapshot/serializers.py b/snapshot/serializers.py index 222ae25..ef2662c 100644 --- a/snapshot/serializers.py +++ b/snapshot/serializers.py @@ -1,8 +1,43 @@ from rest_framework import serializers -from snapshot.models import Snapshot +from snapshot.models import SnapshotJson + +import json + +from django.views.decorators.csrf import csrf_exempt +from django.http import JsonResponse +from snapshot.parse import Parse class SnapshotSerializer(serializers.ModelSerializer): class Meta: - model = Snapshot + model = SnapshotJson fields = ['id', 'title', 'content'] +@csrf_exempt +def webhook_verify(request): + if request.method == 'POST': + auth_header = request.headers.get('Authorization') + if not auth_header or not auth_header.startswith('Bearer '): + return JsonResponse({'error': 'Invalid authorization'}, status=401) + + token = auth_header.split(' ')[1] + tk = Token.objects.filter(token=token).first() + if not tk: + return JsonResponse({'error': 'Invalid authorization'}, status=401) + + try: + data = json.loads(request.body) + except json.JSONDecodeError: + return JsonResponse({'error': 'Invalid JSON'}, status=400) + + try: + device = Parse(data) + except Exception: + return JsonResponse({'error': 'Invalid JSON'}, status=400) + + if not device: + return JsonResponse({'error': 'Invalid JSON'}, status=400) + + return JsonResponse({"result": "Ok"}, status=200) + + + return JsonResponse({'error': 'Invalid request method'}, status=400) diff --git a/snapshot/views.py b/snapshot/views.py index c8d9bc4..0ce4c01 100644 --- a/snapshot/views.py +++ b/snapshot/views.py @@ -1,11 +1,11 @@ # from django.shortcuts import render -from rest_framework import viewsets -from snapshot.models import Snapshot -from snapshot.serializers import SnapshotSerializer +# from rest_framework import viewsets +# from snapshot.models import Snapshot +# from snapshot.serializers import SnapshotSerializer -class SnapshotViewSet(viewsets.ModelViewSet): - queryset = Snapshot.objects.all() - serializer_class = SnapshotSerializer +# class SnapshotViewSet(viewsets.ModelViewSet): +# queryset = Snapshot.objects.all() +# serializer_class = SnapshotSerializer