From 765858ac1bfdb5820e47129ed482661a398e25a8 Mon Sep 17 00:00:00 2001 From: Cayo Puigdefabregas Date: Tue, 11 Jul 2023 14:06:16 +0200 Subject: [PATCH] fix snapshotParse for hwmd --- ereuse_devicehub/api/views.py | 4 +- ereuse_devicehub/inventory/forms.py | 4 +- ereuse_devicehub/parser/parser.py | 271 ++++++++++++------ ereuse_devicehub/parser/schemas.py | 1 - ereuse_devicehub/resources/action/schemas.py | 15 +- .../resources/action/views/snapshot.py | 9 +- ereuse_devicehub/teal/marshmallow.py | 6 +- 7 files changed, 215 insertions(+), 95 deletions(-) diff --git a/ereuse_devicehub/api/views.py b/ereuse_devicehub/api/views.py index ceddd791..37b3411e 100644 --- a/ereuse_devicehub/api/views.py +++ b/ereuse_devicehub/api/views.py @@ -13,7 +13,7 @@ from werkzeug.exceptions import Unauthorized from ereuse_devicehub.auth import Auth from ereuse_devicehub.db import db from ereuse_devicehub.parser.models import SnapshotsLog -from ereuse_devicehub.parser.parser import ParseSnapshotLsHw +from ereuse_devicehub.parser.parser import ParseSnapshot from ereuse_devicehub.parser.schemas import Snapshot_lite from ereuse_devicehub.resources.action.views.snapshot import ( SnapshotMixin, @@ -59,7 +59,7 @@ class InventoryView(LoginMixin, SnapshotMixin): return snapshot_json try: - self.snapshot_json = ParseSnapshotLsHw(snapshot_json).get_snapshot() + self.snapshot_json = ParseSnapshot(snapshot_json).get_snapshot() raise 1 == 2 except Exception as err: logger.error("Error: {} \n{}\n".format(err, self.snapshot_json)) diff --git a/ereuse_devicehub/inventory/forms.py b/ereuse_devicehub/inventory/forms.py index cd6488db..736d308e 100644 --- a/ereuse_devicehub/inventory/forms.py +++ b/ereuse_devicehub/inventory/forms.py @@ -39,7 +39,7 @@ from ereuse_devicehub.inventory.models import ( TransferCustomerDetails, ) from ereuse_devicehub.parser.models import PlaceholdersLog, SnapshotsLog -from ereuse_devicehub.parser.parser import ParseSnapshot, ParseSnapshotLsHw +from ereuse_devicehub.parser.parser import ParseSnapshot from ereuse_devicehub.parser.schemas import Snapshot_lite from ereuse_devicehub.resources.action.models import Snapshot, Trade from ereuse_devicehub.resources.action.schemas import Snapshot as SnapshotSchema @@ -315,7 +315,7 @@ class UploadSnapshotForm(SnapshotMixin, FlaskForm): return True - def save(self, commit=True, user_trusts=True): + def save(self, commit=True, user_trusts=True): # noqa: C901 if any([x == 'Error' for x in self.result.values()]): return schema = SnapshotSchema() diff --git a/ereuse_devicehub/parser/parser.py b/ereuse_devicehub/parser/parser.py index efc7bab2..1f8f4071 100644 --- a/ereuse_devicehub/parser/parser.py +++ b/ereuse_devicehub/parser/parser.py @@ -2,6 +2,7 @@ import json import logging import uuid +import numpy from dmidecode import DMIParse from flask import request from marshmallow.exceptions import ValidationError @@ -25,6 +26,7 @@ class ParseSnapshot: self.lscpi_raw = snapshot["hwmd"]["lspci"] self.device = {"actions": []} self.components = [] + self.monitors = [] self.dmi = DMIParse(self.dmidecode_raw) self.smart = self.loads(self.smart_raw) @@ -32,14 +34,15 @@ class ParseSnapshot: self.hwinfo = self.parse_hwinfo() self.set_computer() + self.get_hwinfo_monitors() self.set_components() self.snapshot_json = { "device": self.device, "software": "UsodyOS", "components": self.components, "uuid": snapshot['uuid'], - "type": snapshot['type'], - "version": "1.0.0", + "version": snapshot['version'], + "settings_version": snapshot['settings_version'], "endTime": snapshot["timestamp"], "elapsed": 1, "sid": snapshot["sid"], @@ -63,7 +66,10 @@ class ParseSnapshot: self.get_cpu() self.get_ram() self.get_mother_board() + self.get_graphic() self.get_data_storage() + self.get_display() + self.get_sound_card() self.get_networks() def get_cpu(self): @@ -87,21 +93,6 @@ class ParseSnapshot: } ) - def get_cpu_address(self, cpu): - default = 64 - for ch in self.lshw.get('children', []): - for c in ch.get('children', []): - if c['class'] == 'processor': - return c.get('width', default) - return default - - def get_ram_model(self, ram): - for ch in self.lshw.get('children', []): - for c in ch.get('children', []): - if c['class'] == 'memory': - if c.get('serial') == ram.get('Serial Number'): - return c.get('Vendor') - def get_ram(self): for ram in self.dmi.get("Memory Device"): self.components.append( @@ -114,14 +105,11 @@ class ParseSnapshot: "serialNumber": ram.get("Serial Number", self.default), "interface": self.get_ram_type(ram), "format": ram.get("Form Factor", "DIMM"), - "partNumber": ram.get("Part Number", self.default), - "model": self.get_ram_model(ram), + "model": ram.get("Part Number", self.default), } ) def get_mother_board(self): - # TODO @cayop model, not exist in dmidecode - # import pdb; pdb.set_trace() for moder_board in self.dmi.get("Baseboard"): self.components.append( { @@ -131,17 +119,170 @@ class ParseSnapshot: "serialNumber": moder_board.get("Serial Number"), "manufacturer": moder_board.get("Manufacturer"), "biosDate": self.get_bios_date(), - # "firewire": self.get_firmware(), "ramMaxSize": self.get_max_ram_size(), "ramSlots": len(self.dmi.get("Memory Device")), "slots": self.get_ram_slots(), - "model": moder_board.get("Product Name"), # ?? - "pcmcia": self.get_pcmcia_num(), # ?? - "serial": self.get_serial_num(), # ?? + "model": moder_board.get("Product Name"), + "firewire": self.get_firmware_num(), + "pcmcia": self.get_pcmcia_num(), + "serial": self.get_serial_num(), "usb": self.get_usb_num(), } ) + def get_graphic(self): + for ch in self.lshw.get('children', []): + for c in ch.get('children', []): + if c['class'] != 'display': + continue + self.components.append( + { + "actions": [], + "type": "GraphicCard", + "manufacturer": c.get("vendor", self.default), + "model": c.get("product", self.default), + "serialNumber": c.get("serial", self.default), + } + ) + + def get_data_storage(self): + for sm in self.smart: + if sm.get('smartctl', {}).get('exit_status') == 1: + continue + model = sm.get('model_name') + manufacturer = None + if model and len(model.split(" ")) > 1: + mm = model.split(" ") + model = mm[-1] + manufacturer = " ".join(mm[:-1]) + + self.components.append( + { + "actions": [], + "type": self.get_data_storage_type(sm), + "model": model, + "manufacturer": manufacturer, + "serialNumber": sm.get('serial_number'), + "size": self.get_data_storage_size(sm), + "variant": sm.get("firmware_version"), + "interface": self.get_data_storage_interface(sm), + } + ) + + def get_networks(self): + for ch in self.lshw.get('children', []): + for c in ch.get('children', []): + if c['class'] == 'networks': + capacity = c.get('capacity') + units = c.get('units') + speed = None + if capacity and units: + speed = base2.unit.Quantity(capacity, units).to('Mbit/s').m + wireless = bool(c.get('configuration', {}).get('wireless', False)) + self.components.append( + { + "actions": [], + "type": "NetworkAdapter", + "model": c.get('product'), + "manufacturer": c.get('vendor'), + "serialNumber": c.get('serial'), + "speed": speed, + "variant": c.get('version', 1), + "wireless": wireless, + } + ) + + def get_sound_card(self): + for ch in self.lshw.get('children', []): + for c in ch.get('children', []): + if c['class'] == 'multimedia': + self.components.append( + { + "actions": [], + "type": "SoundCard", + "model": c.get('product'), + "manufacturer": c.get('vendor'), + "serialNumber": c.get('serial'), + } + ) + + def get_display(self): # noqa: C901 + TECHS = 'CRT', 'TFT', 'LED', 'PDP', 'LCD', 'OLED', 'AMOLED' + + for c in self.monitors: + resolution_width, resolution_height = (None,) * 2 + refresh, serial, model, manufacturer, size = (None,) * 5 + + for x in c: + if "Vendor: " in x: + manufacturer = x.split('Vendor: ')[-1].strip() + if "Model: " in x: + model = x.split('Model: ')[-1].strip() + if "Serial ID: " in x: + serial = x.split('Serial ID: ')[-1].strip() + if " Resolution: " in x: + rs = x.split(' Resolution: ')[-1].strip() + if 'x' in rs: + resolution_width, resolution_height = [ + int(r) for r in rs.split('x') + ] + if "Frequencies: " in x: + try: + refresh = int(float(x.split(',')[-1].strip()[:-3])) + except Exception: + pass + + if "Size: " in x: + size = self.get_size_monitor(x) + technology = next((t for t in TECHS if t in c[0]), None) + self.components.append( + { + "actions": [], + "type": "Display", + "model": model, + "manufacturer": manufacturer, + "serialNumber": serial, + 'size': size, + 'resolution_width': resolution_width, + 'resolution_height': resolution_height, + 'technology': technology, + 'refresh_rate': refresh, + } + ) + + def get_hwinfo_monitors(self): + for c in self.hwinfo: + monitor = None + external = None + for x in c: + if 'Hardware Class: monitor' in x: + monitor = c + if 'Driver Info' in x: + external = c + + if monitor and not external: + self.monitors.append(c) + + def get_size_monitor(self, x): + i = 1 / 25.4 + t = x.split('Size: ')[-1].strip() + tt = t.split('mm') + if not tt: + return 0 + sizes = tt[0].strip() + if 'x' not in sizes: + return 0 + w, h = [int(x) for x in sizes.split('x')] + return numpy.sqrt(w**2 + h**2) * i + + def get_cpu_address(self, cpu): + default = 64 + for ch in self.lshw.get('children', []): + for c in ch.get('children', []): + if c['class'] == 'processor': + return c.get('width', default) + return default + def get_usb_num(self): return len( [ @@ -160,6 +301,15 @@ class ParseSnapshot: ] ) + def get_firmware_num(self): + return len( + [ + u + for u in self.dmi.get("Port Connector") + if "FIRMWARE" in u.get("Port Type", "").upper() + ] + ) + def get_pcmcia_num(self): return len( [ @@ -290,72 +440,33 @@ class ParseSnapshot: return k return self.default - def get_data_storage(self): - - for sm in self.smart: - model = sm.get('model_name') - manufacturer = None - if len(model.split(" ")) == 2: - manufacturer, model = model.split(" ") - - self.components.append( - { - "actions": [], - "type": self.get_data_storage_type(sm), - "model": model, - "manufacturer": manufacturer, - "serialNumber": sm.get('serial_number'), - "size": self.get_data_storage_size(sm), - "variant": sm.get("firmware_version"), - "interface": self.get_data_storage_interface(sm), - } - ) - def get_data_storage_type(self, x): # TODO @cayop add more SSDS types SSDS = ["nvme"] SSD = 'SolidStateDrive' HDD = 'HardDrive' type_dev = x.get('device', {}).get('type') - return SSD if type_dev in SSDS else HDD + trim = x.get('trim', {}).get("supported") in [True, "true"] + return SSD if type_dev in SSDS or trim else HDD def get_data_storage_interface(self, x): - return x.get('device', {}).get('protocol', 'ATA') + interface = x.get('device', {}).get('protocol', 'ATA') + try: + DataStorageInterface(interface.upper()) + except ValueError as err: + txt = "Sid: {}, interface {} is not in DataStorageInterface Enum".format( + self.sid, interface + ) + self.errors("{}".format(err)) + self.errors(txt, severity=Severity.Warning) + return "ATA" def get_data_storage_size(self, x): - type_dev = x.get('device', {}).get('type') - total_capacity = "{type}_total_capacity".format(type=type_dev) + total_capacity = x.get('user_capacity', {}).get('bytes') + if not total_capacity: + return 1 # convert bytes to Mb - return x.get(total_capacity) / 1024**2 - - def get_networks(self): - hw_class = " Hardware Class: " - mac = " Permanent HW Address: " - model = " Model: " - wireless = "wireless" - - for line in self.hwinfo: - iface = { - "variant": "1", - "actions": [], - "speed": 100.0, - "type": "NetworkAdapter", - "wireless": False, - "manufacturer": "Ethernet", - } - for y in line: - if hw_class in y and not y.split(hw_class)[1] == 'network': - break - - if mac in y: - iface["serialNumber"] = y.split(mac)[1] - if model in y: - iface["model"] = y.split(model)[1] - if wireless in y: - iface["wireless"] = True - - if iface.get("serialNumber"): - self.components.append(iface) + return total_capacity / 1024**2 def parse_hwinfo(self): hw_blocks = self.hwinfo_raw.split("\n\n") diff --git a/ereuse_devicehub/parser/schemas.py b/ereuse_devicehub/parser/schemas.py index 8b0fdd34..aead6541 100644 --- a/ereuse_devicehub/parser/schemas.py +++ b/ereuse_devicehub/parser/schemas.py @@ -36,7 +36,6 @@ class Snapshot_lite(Thing): # description='The software that generated this Snapshot.', # ) sid = String(required=True) - type = String(required=True) timestamp = String(required=True) settings_version = String(required=False) hwmd = Nested(Snapshot_lite_data, required=True) diff --git a/ereuse_devicehub/resources/action/schemas.py b/ereuse_devicehub/resources/action/schemas.py index 4760edf6..5eba4a12 100644 --- a/ereuse_devicehub/resources/action/schemas.py +++ b/ereuse_devicehub/resources/action/schemas.py @@ -469,9 +469,12 @@ class Snapshot(ActionWithOneDevice): @validates_schema def validate_components_only_workbench(self, data: dict): - if (data['software'] != SnapshotSoftware.Workbench) and ( - data['software'] != SnapshotSoftware.WorkbenchAndroid - ): + software = [ + SnapshotSoftware.Workbench, + SnapshotSoftware.WorkbenchAndroid, + SnapshotSoftware.UsodyOS, + ] + if data['software'] not in software: if data.get('components', None) is not None: raise ValidationError( 'Only Workbench can add component info', field_names=['components'] @@ -481,7 +484,11 @@ class Snapshot(ActionWithOneDevice): def validate_only_workbench_fields(self, data: dict): """Ensures workbench has ``elapsed`` and ``uuid`` and no others.""" # todo test - if data['software'] == SnapshotSoftware.Workbench: + software = [ + SnapshotSoftware.Workbench, + SnapshotSoftware.UsodyOS, + ] + if data['software'] in software: if not data.get('uuid', None): raise ValidationError( 'Snapshots from Workbench and WorkbenchAndroid must have uuid', diff --git a/ereuse_devicehub/resources/action/views/snapshot.py b/ereuse_devicehub/resources/action/views/snapshot.py index 8fb2758e..4a7bc642 100644 --- a/ereuse_devicehub/resources/action/views/snapshot.py +++ b/ereuse_devicehub/resources/action/views/snapshot.py @@ -70,9 +70,12 @@ class SnapshotMixin: snapshot_json = self.snapshot_json device = snapshot_json.pop('device') # type: Computer components = None - if snapshot_json['software'] == ( - SnapshotSoftware.Workbench or SnapshotSoftware.WorkbenchAndroid - ): + software = [ + SnapshotSoftware.Workbench, + SnapshotSoftware.WorkbenchAndroid, + SnapshotSoftware.UsodyOS, + ] + if snapshot_json['software'] in software: components = snapshot_json.pop('components', None) snapshot = Snapshot(**snapshot_json) diff --git a/ereuse_devicehub/teal/marshmallow.py b/ereuse_devicehub/teal/marshmallow.py index 72189930..1517f2ea 100644 --- a/ereuse_devicehub/teal/marshmallow.py +++ b/ereuse_devicehub/teal/marshmallow.py @@ -1,10 +1,9 @@ import ipaddress -from distutils.version import StrictVersion +from distutils.version import LooseVersion from typing import Type, Union import colour from boltons import strutils, urlutils -from ereuse_devicehub.ereuse_utils import if_none_return_none from flask import current_app as app from flask import g from marshmallow import utils @@ -17,6 +16,7 @@ from marshmallow.validate import Validator from marshmallow_enum import EnumField as _EnumField from sqlalchemy_utils import PhoneNumber +from ereuse_devicehub.ereuse_utils import if_none_return_none from ereuse_devicehub.teal import db as tealdb from ereuse_devicehub.teal.resource import Schema @@ -30,7 +30,7 @@ class Version(Field): @if_none_return_none def _deserialize(self, value, attr, data): - return StrictVersion(value) + return LooseVersion(value) class Color(Field):