From 417276b88bc2efca0a8930fde77270ec6a44b3d0 Mon Sep 17 00:00:00 2001 From: Cayo Puigdefabregas Date: Tue, 22 Mar 2022 13:17:41 +0100 Subject: [PATCH] init parse from wb --- ereuse_devicehub/parser/__init__.py | 30 + ereuse_devicehub/parser/computer.py | 541 ++++++++++++++++++ ereuse_devicehub/parser/snapshot.py | 56 ++ .../parser/unit_registry/base2.quantities.txt | 4 + .../parser/unit_registry/quantities.txt | 9 + ereuse_devicehub/parser/utils.py | 38 ++ ereuse_devicehub/parser/workbench | 1 + 7 files changed, 679 insertions(+) create mode 100644 ereuse_devicehub/parser/__init__.py create mode 100644 ereuse_devicehub/parser/computer.py create mode 100644 ereuse_devicehub/parser/snapshot.py create mode 100644 ereuse_devicehub/parser/unit_registry/base2.quantities.txt create mode 100644 ereuse_devicehub/parser/unit_registry/quantities.txt create mode 100644 ereuse_devicehub/parser/utils.py create mode 160000 ereuse_devicehub/parser/workbench diff --git a/ereuse_devicehub/parser/__init__.py b/ereuse_devicehub/parser/__init__.py new file mode 100644 index 00000000..c3f9cfd6 --- /dev/null +++ b/ereuse_devicehub/parser/__init__.py @@ -0,0 +1,30 @@ +from pathlib import Path + +from pint import UnitRegistry + +# Sets up the unit handling +unit_registry = Path(__file__).parent / 'unit_registry' + +unit = UnitRegistry() +unit.load_definitions(str(unit_registry / 'quantities.txt')) +TB = unit.TB +GB = unit.GB +MB = unit.MB +Mbs = unit.Mbit / unit.s +MBs = unit.MB / unit.s +Hz = unit.Hz +GHz = unit.GHz +MHz = unit.MHz +Inch = unit.inch +mAh = unit.hour * unit.mA +mV = unit.mV + +base2 = UnitRegistry() +base2.load_definitions(str(unit_registry / 'base2.quantities.txt')) + +GiB = base2.GiB + + +# pint +# numpy +# pySMART diff --git a/ereuse_devicehub/parser/computer.py b/ereuse_devicehub/parser/computer.py new file mode 100644 index 00000000..41979e16 --- /dev/null +++ b/ereuse_devicehub/parser/computer.py @@ -0,0 +1,541 @@ +import json +import os +import re +import subprocess +from contextlib import suppress +from datetime import datetime +from enum import Enum, unique +from fractions import Fraction +from subprocess import CalledProcessError, PIPE, run +from typing import Iterator, List, Optional, Tuple, Type, TypeVar +from warnings import catch_warnings, filterwarnings + +import dateutil.parser +import pySMART +from ereuse_utils import cmd, getter as g, text +from ereuse_utils.nested_lookup import get_nested_dicts_with_key_containing_value, \ + get_nested_dicts_with_key_value +from numpy import hypot + +from ereuse_devicehub.parser import base2, unit, utils +from ereuse_devicehub.parser.utils import Dumpeable + + +class Device(Dumpeable): + """ + Base class for a computer and each component, containing + its physical characteristics (like serial number) and Devicehub + actions. For Devicehub actions, this class has an interface to execute + :meth:`.benchmarks`. + """ + + def __init__(self, *sources) -> None: + """Gets the device information.""" + self.actions = set() + self.type = self.__class__.__name__ + super().__init__() + + def from_lshw(self, lshw_node: dict): + self.manufacturer = g.dict(lshw_node, 'vendor', default=None, type=str) + self.model = g.dict(lshw_node, + 'product', + remove={self.manufacturer} if self.manufacturer else set(), + default=None, + type=str) + self.serial_number = g.dict(lshw_node, 'serial', default=None, type=str) + + def __str__(self) -> str: + return ' '.join(x for x in (self.model, self.serial_number) if x) + + +C = TypeVar('C', bound='Component') + + +class Component(Device): + @classmethod + def new(cls, lshw, hwinfo, **kwargs) -> Iterator[C]: + raise NotImplementedError() + + +class Processor(Component): + @classmethod + def new(cls, lshw: dict, **kwargs) -> Iterator[C]: + nodes = get_nested_dicts_with_key_value(lshw, 'class', 'processor') + # We want only the physical cpu's, not the logic ones + # In some cases we may get empty cpu nodes, we can detect them because + # all regular cpus have at least a description (Intel Core i5...) + return (cls(node) for node in nodes if + 'logical' not in node['id'] + and node.get('description', '').lower() != 'co-processor' + and not node.get('disabled') + and 'co-processor' not in node.get('model', '').lower() + and 'co-processor' not in node.get('description', '').lower() + and 'width' in node) + + def __init__(self, node: dict) -> None: + super().__init__(node) + self.from_lshw(node) + self.speed = unit.Quantity(node['size'], node['units']).to('gigahertz').m + self.address = node['width'] + try: + self.cores = int(node['configuration']['cores']) + self.threads = int(node['configuration']['threads']) + except KeyError: + self.threads = os.cpu_count() + if self.threads == 1: + self.cores = 1 # If there is only one thread there is only one core + self.serial_number = None # Processors don't have valid SN :-( + self.brand, self.generation = self.processor_brand_generation(self.model) + + assert not hasattr(self, 'cores') or 1 <= self.cores <= 16 + + + @staticmethod + def processor_brand_generation(model: str): + """Generates the ``brand`` and ``generation`` fields for the given model. + + This returns a tuple with: + + - The brand as a string or None. + - The generation as an int or None. + """ + # Intel desktop processor numbers: https://www.intel.com/content/www/us/en/processors/processor-numbers.html + # Intel server processor numbers: https://www.intel.com/content/www/us/en/processors/processor-numbers-data-center.html + + if 'Duo' in model: + return 'Core2 Duo', None + if 'Quad' in model: + return 'Core2 Quad', None + if 'Atom' in model: + return 'Atom', None + if 'Celeron' in model: + return 'Celeron', None + if 'Pentium' in model: + return 'Pentium', None + if 'Xeon Platinum' in model: + generation = int(re.findall(r'\bPlatinum \d{4}\w', model)[0][10]) + return 'Xeon Platinum', generation + if 'Xeon Gold' in model: + generation = int(re.findall(r'\bGold \d{4}\w', model)[0][6]) + return 'Xeon Gold', generation + if 'Xeon' in model: # Xeon E5... + generation = 1 + results = re.findall(r'\bV\d\b', model) # find V1, V2... + if results: + generation = int(results[0][1]) + return 'Xeon', generation + results = re.findall(r'\bi\d-\w+', model) # i3-XXX..., i5-XXX... + if results: # i3, i5... + return 'Core i{}'.format(results[0][1]), int(results[0][3]) + results = re.findall(r'\bi\d CPU \w+', model) + if results: # i3 CPU XXX + return 'Core i{}'.format(results[0][1]), 1 + results = re.findall(r'\bm\d-\w+', model) # m3-XXXX... + if results: + return 'Core m{}'.format(results[0][1]), None + return None, None + + def __str__(self) -> str: + return super().__str__() + ( + ' ({} generation)'.format(self.generation) if self.generation else '' + ) + + +class RamModule(Component): + @classmethod + def new(cls, lshw, **kwargs) -> Iterator[C]: + # We can get flash memory (BIOS?), system memory and unknown types of memory + memories = get_nested_dicts_with_key_value(lshw, 'class', 'memory') + TYPES = {'ddr', 'sdram', 'sodimm'} + for memory in memories: + physical_ram = any(t in memory.get('description', '').lower() for t in TYPES) + not_empty = 'size' in memory + if physical_ram and not_empty: + yield cls(memory) + + def __init__(self, node: dict) -> None: + # Node with no size == empty ram slot + super().__init__(node) + self.from_lshw(node) + description = node['description'].upper() + self.format = 'SODIMM' if 'SODIMM' in description else 'DIMM' + self.size = base2.Quantity(node['size'], node['units']).to('MiB').m + # self.size = int(utils.convert_capacity(node['size'], node['units'], 'MB')) + for w in description.split(): + if w.startswith('DDR'): # We assume all DDR are SDRAM + self.interface = w + break + elif w.startswith('SDRAM'): + # Fallback. SDRAM is generic denomination for DDR types. + self.interface = w + if 'clock' in node: + self.speed = unit.Quantity(node['clock'], 'Hz').to('MHz').m + assert not hasattr(self, 'speed') or 100.0 <= self.speed <= 1000000000000.0 + + def __str__(self) -> str: + return '{} {} {}'.format(super().__str__(), self.format, self.size) + + +class DataStorage(Component): + @classmethod + def new(cls, lshw, hwinfo, **kwargs) -> Iterator[C]: + disks = get_nested_dicts_with_key_containing_value(lshw, 'id', 'disk') + + usb_disks = list() # List of disks that are plugged in an USB host + for usb in get_nested_dicts_with_key_containing_value(lshw, 'id', 'usbhost'): + usb_disks.extend(get_nested_dicts_with_key_containing_value(usb, 'id', 'disk')) + + for disk in (n for n in disks if n not in usb_disks): + # We can get nodes that are not truly disks as they don't have size + if 'size' in disk: + interface = DataStorage.get_interface(disk) + removable = interface == 'usb' or \ + disk.get('capabilities', {}).get('removable', False) + if not removable: + yield cls(disk, interface) + + SSD = 'SolidStateDrive' + HDD = 'HardDrive' + + @unique + class DataStorageInterface(Enum): + ATA = 'ATA' + USB = 'USB' + PCI = 'PCI' + + def __str__(self): + return self.value + + def __init__(self, node: dict, interface: str) -> None: + super().__init__(node) + self.from_lshw(node) + self.size = unit.Quantity(node['size'], node.get('units', 'B')).to('MB').m + self.interface = self.DataStorageInterface(interface.upper()) if interface else None + self._logical_name = node['logicalname'] + self.variant = node['version'] + + with catch_warnings(): + filterwarnings('error') + try: + smart = pySMART.Device(self._logical_name) + except Warning: + self.type = self.HDD + else: + self.type = self.SSD if smart.is_ssd else self.HDD + self.serial_number = self.serial_number or smart.serial + self.model = self.model or smart.model + + assert 1.0 < self.size < 1000000000000000.0, \ + 'Invalid HDD size {}'.format(self.size) + + def __str__(self) -> str: + return '{} {} {} with {} MB'.format(super().__str__(), self.interface, self.type, + self.size) + + @staticmethod + def get_interface(node: dict): + interface = run('udevadm info ' + '--query=all ' + '--name={} | ' + 'grep ' + 'ID_BUS | ' + 'cut -c 11-'.format(node['logicalname']), + check=True, universal_newlines=True, shell=True, stdout=PIPE).stdout + # todo not sure if ``interface != usb`` is needed + return interface.strip() + + +class GraphicCard(Component): + @classmethod + def new(cls, lshw, hwinfo, **kwargs) -> Iterator[C]: + nodes = get_nested_dicts_with_key_value(lshw, 'class', 'display') + return (cls(n) for n in nodes if n['configuration'].get('driver', None)) + + def __init__(self, node: dict) -> None: + super().__init__(node) + self.from_lshw(node) + self.memory = self._memory(node['businfo'].split('@')[1]) + + @staticmethod + def _memory(bus_info): + """The size of the memory of the gpu.""" + try: + lines = cmd.run('lspci', + '-v -s {bus} | ', + 'grep \'prefetchable\' | ', + 'grep -v \'non-prefetchable\' | ', + 'egrep -o \'[0-9]{{1,3}}[KMGT]+\''.format(bus=bus_info), + shell=True).stdout.splitlines() + return max((base2.Quantity(value).to('MiB') for value in lines), default=None) + except subprocess.CalledProcessError: + return None + + def __str__(self) -> str: + return '{} with {}'.format(super().__str__(), self.memory) + + +class Motherboard(Component): + INTERFACES = 'usb', 'firewire', 'serial', 'pcmcia' + + @classmethod + def new(cls, lshw, hwinfo, **kwargs) -> C: + node = next(get_nested_dicts_with_key_value(lshw, 'description', 'Motherboard')) + bios_node = next(get_nested_dicts_with_key_value(lshw, 'id', 'firmware')) + memory_array = next(g.indents(hwinfo, 'Physical Memory Array', indent=' '), None) + return cls(node, bios_node, memory_array) + + def __init__(self, node: dict, bios_node: dict, memory_array: Optional[List[str]]) -> None: + super().__init__(node) + self.from_lshw(node) + self.usb = self.num_interfaces(node, 'usb') + self.firewire = self.num_interfaces(node, 'firewire') + self.serial = self.num_interfaces(node, 'serial') + self.pcmcia = self.num_interfaces(node, 'pcmcia') + self.slots = int(run('dmidecode -t 17 | ' + 'grep -o BANK | ' + 'wc -l', + check=True, + universal_newlines=True, + shell=True, + stdout=PIPE).stdout) + self.bios_date = dateutil.parser.parse(bios_node['date']) + self.version = bios_node['version'] + self.ram_slots = self.ram_max_size = None + if memory_array: + self.ram_slots = g.kv(memory_array, 'Slots', default=None) + self.ram_max_size = g.kv(memory_array, 'Max. Size', default=None) + if self.ram_max_size: + self.ram_max_size = next(text.numbers(self.ram_max_size)) + + @staticmethod + def num_interfaces(node: dict, interface: str) -> int: + interfaces = get_nested_dicts_with_key_containing_value(node, 'id', interface) + if interface == 'usb': + interfaces = (c for c in interfaces + if 'usbhost' not in c['id'] and 'usb' not in c['businfo']) + return len(tuple(interfaces)) + + def __str__(self) -> str: + return super().__str__() + + +class NetworkAdapter(Component): + @classmethod + def new(cls, lshw, hwinfo, **kwargs) -> Iterator[C]: + nodes = get_nested_dicts_with_key_value(lshw, 'class', 'network') + return (cls(node) for node in nodes) + + def __init__(self, node: dict) -> None: + super().__init__(node) + self.from_lshw(node) + self.speed = None + if 'capacity' in node: + self.speed = unit.Quantity(node['capacity'], 'bit/s').to('Mbit/s').m + if 'logicalname' in node: # todo this was taken from 'self'? + # If we don't have logicalname it means we don't have the + # (proprietary) drivers fot that NetworkAdaptor + # which means we can't access at the MAC address + # (note that S/N == MAC) "sudo /sbin/lspci -vv" could bring + # the MAC even if no drivers are installed however more work + # has to be done in ensuring it is reliable, really needed, + # and to parse it + # https://www.redhat.com/archives/redhat-list/2010-October/msg00066.html + # workbench-live includes proprietary firmwares + self.serial_number = self.serial_number or utils.get_hw_addr(node['logicalname']) + + self.variant = node.get('version', None) + self.wireless = bool(node.get('configuration', {}).get('wireless', False)) + + def __str__(self) -> str: + return '{} {} {}'.format(super().__str__(), self.speed, + 'wireless' if self.wireless else 'ethernet') + + +class SoundCard(Component): + @classmethod + def new(cls, lshw, hwinfo, **kwargs) -> Iterator[C]: + nodes = get_nested_dicts_with_key_value(lshw, 'class', 'multimedia') + return (cls(node) for node in nodes) + + def __init__(self, node) -> None: + super().__init__(node) + self.from_lshw(node) + + +class Display(Component): + TECHS = 'CRT', 'TFT', 'LED', 'PDP', 'LCD', 'OLED', 'AMOLED' + """Display technologies""" + + @classmethod + def new(cls, lshw, hwinfo, **kwargs) -> Iterator[C]: + for node in g.indents(hwinfo, 'Monitor'): + yield cls(node) + + def __init__(self, node: dict) -> None: + super().__init__(node) + self.model = g.kv(node, 'Model') + self.manufacturer = g.kv(node, 'Vendor') + self.serial_number = g.kv(node, 'Serial ID', default=None, type=str) + self.resolution_width, self.resolution_height, refresh_rate = text.numbers( + g.kv(node, 'Resolution') + ) + self.refresh_rate = unit.Quantity(refresh_rate, 'Hz').m + with suppress(StopIteration): + # some monitors can have several resolutions, and the one + # in "Detailed Timings" seems the highest one + timings = next(g.indents(node, 'Detailed Timings', indent=' ')) + self.resolution_width, self.resolution_height = text.numbers( + g.kv(timings, 'Resolution') + ) + x, y = (unit.Quantity(v, 'millimeter').to('inch') for v in + text.numbers(g.kv(node, 'Size'))) + self.size = float(hypot(x, y).m) + self.technology = next((t for t in self.TECHS if t in node[0]), None) + d = '{} {} 0'.format(g.kv(node, 'Year of Manufacture'), g.kv(node, 'Week of Manufacture')) + # We assume it has been produced the first day of such week + self.production_date = datetime.strptime(d, '%Y %W %w') + self._aspect_ratio = Fraction(self.resolution_width, self.resolution_height) + + def __str__(self) -> str: + return '{0} {1.resolution_width}x{1.resolution_height} {1.size} inches {2}'.format( + super().__str__(), self, self._aspect_ratio) + + +class Battery(Component): + class Technology(Enum): + """ereuse.org Battery technology with translated values from + the Linux Kernel convention, from + https://www.kernel.org/doc/Documentation/ABI/testing/sysfs-class-power. + """ + LiIon = 'Li-ion' + NiCd = 'NiCd' + NiMH = 'NiMH' + LiPoly = 'Li-poly' + LiFe = 'LiFe' + LiMn = 'LiMn' + + PRE = 'POWER_SUPPLY_' + + @classmethod + def new(cls, lshw, hwinfo, **kwargs) -> Iterator[C]: + try: + uevent = cmd \ + .run('cat', '/sys/class/power_supply/BAT*/uevent', shell=True) \ + .stdout.splitlines() + except CalledProcessError: + return + yield cls(uevent) + + def __init__(self, node: List[str]) -> None: + super().__init__(node) + try: + self.serial_number = g.kv(node, self.PRE + 'SERIAL_NUMBER', sep='=', type=str) + self.manufacturer = g.kv(node, self.PRE + 'MANUFACTURER', sep='=') + self.model = g.kv(node, self.PRE + 'MODEL_NAME', sep='=') + self.size = g.kv(node, self.PRE + 'CHARGE_FULL_DESIGN', sep='=', default=0) + if self.size is not None: + self.size = self.size // 1000 + self.technology = g.kv(node, self.PRE + 'TECHNOLOGY', sep='=', type=self.Technology) + measure = MeasureBattery( + size=g.kv(node, self.PRE + 'CHARGE_FULL', sep='='), + voltage=g.kv(node, self.PRE + 'VOLTAGE_NOW', sep='='), + cycle_count=g.kv(node, self.PRE + 'CYCLE_COUNT', sep='=') + ) + try: + measure.size = measure.size.m + measure.voltage = measure.voltage.m + except AttributeError: + pass + self.actions.add(measure) + self._wear = round(1 - measure.size / self.size, 2) \ + if self.size and measure.size else None + self._node = node + except NoBatteryInfo: + self._node = None + + def __str__(self) -> str: + try: + return '{0} {1.technology}. Size: {1.size} Wear: {1._wear:%}'.format(super().__str__(), + self) + except TypeError: + return 'There is not currently battery information' + + +class Computer(Device): + CHASSIS_TYPE = { + 'Desktop': {'desktop', 'low-profile', 'tower', 'docking', 'all-in-one', 'pizzabox', + 'mini-tower', 'space-saving', 'lunchbox', 'mini', 'stick'}, + 'Laptop': {'portable', 'laptop', 'convertible', 'tablet', 'detachable', 'notebook', + 'handheld', 'sub-notebook'}, + 'Server': {'server'}, + 'Computer': {'_virtual'} + } + """ + A translation dictionary whose keys are Devicehub types and values + are possible chassis values that `dmi `_ can offer. + """ + CHASSIS_DH = { + 'Tower': {'desktop', 'low-profile', 'tower', 'server'}, + 'Docking': {'docking'}, + 'AllInOne': {'all-in-one'}, + 'Microtower': {'mini-tower', 'space-saving', 'mini'}, + 'PizzaBox': {'pizzabox'}, + 'Lunchbox': {'lunchbox'}, + 'Stick': {'stick'}, + 'Netbook': {'notebook', 'sub-notebook'}, + 'Handheld': {'handheld'}, + 'Laptop': {'portable', 'laptop'}, + 'Convertible': {'convertible'}, + 'Detachable': {'detachable'}, + 'Tablet': {'tablet'}, + 'Virtual': {'_virtual'} + } + """ + A conversion table from DMI's chassis type value Devicehub + chassis value. + """ + + COMPONENTS = list(Component.__subclasses__()) # type: List[Type[Component]] + COMPONENTS.remove(Motherboard) + COMPONENTS.remove(Battery) + + def __init__(self, node: dict) -> None: + super().__init__(node) + self.from_lshw(node) + chassis = node['configuration'].get('chassis', '_virtual') + self.type = next(t for t, values in self.CHASSIS_TYPE.items() if chassis in values) + self.chassis = next(t for t, values in self.CHASSIS_DH.items() if chassis in values) + self.sku = g.dict(node, ('configuration', 'sku'), default=None, type=str) + self.version = g.dict(node, 'version', default=None, type=str) + self._ram = None + + @classmethod + def run(cls, lshw_raw, hwinfo_raw): + """ + Gets hardware information from the computer and its components, + like serial numbers or model names, and benchmarks them. + + This function uses ``LSHW`` as the main source of hardware information, + which is obtained once when it is instantiated. + """ + lshw = json.loads(lshw_raw) + hwinfo = hwinfo_raw.splitlines() + computer = cls(lshw) + components = [] + for Component in cls.COMPONENTS: + if Component == Display and computer.type != 'Laptop': + continue # Only get display info when computer is laptop + components.extend(Component.new(lshw=lshw, hwinfo=hwinfo)) + components.append(Motherboard.new(lshw, hwinfo)) + + computer._ram = sum(ram.size for ram in components if isinstance(ram, RamModule)) + return computer, components + + def __str__(self) -> str: + specs = super().__str__() + return '{} with {} MB of RAM.'.format(specs, self._ram) + + +class NoBatteryInfo(Exception): + print('Cannot get battery information') diff --git a/ereuse_devicehub/parser/snapshot.py b/ereuse_devicehub/parser/snapshot.py new file mode 100644 index 00000000..be403a04 --- /dev/null +++ b/ereuse_devicehub/parser/snapshot.py @@ -0,0 +1,56 @@ +from datetime import datetime, timezone +from distutils.version import StrictVersion +from enum import Enum, unique +from typing import List, Optional +from uuid import UUID + +import inflection +from ereuse_utils import cli +from ereuse_utils.cli import Line +from ereuse_utils.session import DevicehubClient + +from ereuse_workbench.computer import Component, Computer, DataStorage, SoundCard +from ereuse_workbench.utils import Dumpeable + + +@unique +class SnapshotSoftware(Enum): + """The algorithm_software used to perform the Snapshot.""" + Workbench = 'Workbench' + AndroidApp = 'AndroidApp' + Web = 'Web' + DesktopApp = 'DesktopApp' + + +class Snapshot(Dumpeable): + """ + Generates the Snapshot report for Devicehub by obtaining the + data from the computer, performing benchmarks and tests... + + After instantiating the class, run :meth:`.computer` before any + other method. + """ + + def __init__(self, uuid, software, version, lshw, hwinfo): + self.type = 'Snapshot' + self.uuid = uuid + self.software = software + self.version = version + self.lshw = lshw + self.hwinfo = hwinfo + self.endTime = datetime.now(timezone.utc) + self.closed = False + self.elapsed = None + self.device = None # type: Computer + self.components = None # type: List[Component] + self._storages = None + + def computer(self): + """Retrieves information about the computer and components.""" + self.device, self.components = Computer.run(self.lshw, self.hwinfo) + self._storages = tuple(c for c in self.components if isinstance(c, DataStorage)) + + def close(self): + """Closes the Snapshot + """ + self.closed = True diff --git a/ereuse_devicehub/parser/unit_registry/base2.quantities.txt b/ereuse_devicehub/parser/unit_registry/base2.quantities.txt new file mode 100644 index 00000000..2c724a2d --- /dev/null +++ b/ereuse_devicehub/parser/unit_registry/base2.quantities.txt @@ -0,0 +1,4 @@ +K = KiB = k = kb = KB +M = MiB = m = mb = MB +G = GiB = g = gb = GB +T = TiB = t = tb = TB diff --git a/ereuse_devicehub/parser/unit_registry/quantities.txt b/ereuse_devicehub/parser/unit_registry/quantities.txt new file mode 100644 index 00000000..d658ab26 --- /dev/null +++ b/ereuse_devicehub/parser/unit_registry/quantities.txt @@ -0,0 +1,9 @@ +HZ = hertz = hz +KHZ = kilohertz = khz +MHZ = megahertz = mhz +GHZ = gigahertz = ghz +B = byte = b = UNIT = unit +KB = kilobyte = kb = K = k +MB = megabyte = mb = M = m +GB = gigabyte = gb = G = g +T = terabyte = tb = T = t diff --git a/ereuse_devicehub/parser/utils.py b/ereuse_devicehub/parser/utils.py new file mode 100644 index 00000000..e36990fe --- /dev/null +++ b/ereuse_devicehub/parser/utils.py @@ -0,0 +1,38 @@ +import datetime +import fcntl +import socket +import struct +from contextlib import contextmanager +from enum import Enum + +from ereuse_utils import Dumpeable + + +class Severity(Enum): + Info = 'Info' + Error = 'Error' + + +def get_hw_addr(ifname): + # http://stackoverflow.com/a/4789267/1538221 + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + info = fcntl.ioctl(s.fileno(), 0x8927, struct.pack('256s', ifname[:15])) + return ':'.join('%02x' % ord(char) for char in info[18:24]) + + +class Measurable(Dumpeable): + """A base class that allows measuring execution times.""" + + def __init__(self) -> None: + super().__init__() + self.elapsed = None + + @contextmanager + def measure(self): + init = datetime.datetime.now(datetime.timezone.utc) + yield + self.elapsed = datetime.datetime.now(datetime.timezone.utc) - init + try: + assert self.elapsed.total_seconds() > 0 + except AssertionError: + self.elapsed = datetime.timedelta(seconds=0) diff --git a/ereuse_devicehub/parser/workbench b/ereuse_devicehub/parser/workbench new file mode 160000 index 00000000..492dca0e --- /dev/null +++ b/ereuse_devicehub/parser/workbench @@ -0,0 +1 @@ +Subproject commit 492dca0eeff5279b53e5dcfbedd333dc5739f1d7