Merge branch 'feature/server-side-render-parser-3021' into feature/list-snapshots-view-#3113

This commit is contained in:
Cayo Puigdefabregas 2022-04-13 17:30:24 +02:00
commit 82bdfe3db5
41 changed files with 9705 additions and 794 deletions

View File

View File

@ -0,0 +1,84 @@
import json
from binascii import Error as asciiError
from flask import Blueprint
from flask import current_app as app
from flask import g, jsonify, request
from flask.views import View
from marshmallow import ValidationError
from werkzeug.exceptions import Unauthorized
from ereuse_devicehub.auth import Auth
from ereuse_devicehub.db import db
from ereuse_devicehub.parser.models import SnapshotErrors
from ereuse_devicehub.parser.parser import ParseSnapshotLsHw
from ereuse_devicehub.parser.schemas import Snapshot_lite
from ereuse_devicehub.resources.action.views.snapshot import (
SnapshotMix,
move_json,
save_json,
)
from ereuse_devicehub.resources.enums import Severity
api = Blueprint('api', __name__, url_prefix='/api')
class LoginMix(View):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.authenticate()
def authenticate(self):
unauthorized = Unauthorized('Provide a suitable token.')
basic_token = request.headers.get('Authorization', " ").split(" ")
if not len(basic_token) == 2:
raise unauthorized
token = basic_token[1]
try:
token = Auth.decode(token)
except asciiError:
raise unauthorized
self.user = Auth().authenticate(token)
g.user = self.user
class InventoryView(LoginMix, SnapshotMix):
methods = ['POST']
def dispatch_request(self):
snapshot_json = json.loads(request.data)
self.tmp_snapshots = app.config['TMP_SNAPSHOTS']
self.path_snapshot = save_json(snapshot_json, self.tmp_snapshots, g.user.email)
snapshot_json = self.validate(snapshot_json)
try:
self.snapshot_json = ParseSnapshotLsHw(snapshot_json).get_snapshot()
except ValidationError:
self.response = jsonify('')
self.response.status_code = 201
return self.response
snapshot = self.build()
db.session.add(snapshot)
db.session().final_flush()
db.session.commit()
self.response = self.schema.jsonify(snapshot)
self.response.status_code = 201
move_json(self.tmp_snapshots, self.path_snapshot, g.user.email)
return self.response
def validate(self, snapshot_json):
self.schema = Snapshot_lite()
try:
return self.schema.load(snapshot_json)
except ValidationError as err:
txt = "{}".format(err)
uuid = snapshot_json.get('uuid')
error = SnapshotErrors(
description=txt, snapshot_uuid=uuid, severity=Severity.Error
)
error.save(commit=True)
raise err
api.add_url_rule('/inventory/', view_func=InventoryView.as_view('inventory'))

View File

@ -1,26 +1,35 @@
from distutils.version import StrictVersion
from itertools import chain
from typing import Set
from decouple import config
from decouple import config
from teal.auth import TokenAuth
from teal.config import Config
from teal.enums import Currency
from teal.utils import import_resource
from ereuse_devicehub.resources import action, agent, deliverynote, inventory, \
lot, tag, user
from ereuse_devicehub.resources import (
action,
agent,
deliverynote,
inventory,
lot,
tag,
user,
)
from ereuse_devicehub.resources.device import definitions
from ereuse_devicehub.resources.documents import documents
from ereuse_devicehub.resources.tradedocument import definitions as tradedocument
from ereuse_devicehub.resources.enums import PriceSoftware
from ereuse_devicehub.resources.versions import versions
from ereuse_devicehub.resources.licences import licences
from ereuse_devicehub.resources.metric import definitions as metric_def
from ereuse_devicehub.resources.tradedocument import definitions as tradedocument
from ereuse_devicehub.resources.versions import versions
class DevicehubConfig(Config):
RESOURCE_DEFINITIONS = set(chain(import_resource(definitions),
RESOURCE_DEFINITIONS = set(
chain(
import_resource(definitions),
import_resource(action),
import_resource(user),
import_resource(tag),
@ -33,7 +42,8 @@ class DevicehubConfig(Config):
import_resource(versions),
import_resource(licences),
import_resource(metric_def),
),)
),
)
PASSWORD_SCHEMES = {'pbkdf2_sha256'} # type: Set[str]
SECRET_KEY = config('SECRET_KEY')
DB_USER = config('DB_USER', 'dhub')
@ -53,6 +63,7 @@ class DevicehubConfig(Config):
"""The minimum version of ereuse.org workbench that this devicehub
accepts. we recommend not changing this value.
"""
WORKBENCH_LITE = ["1.0.0"]
TMP_SNAPSHOTS = config('TMP_SNAPSHOTS', '/tmp/snapshots')
TMP_LIVES = config('TMP_LIVES', '/tmp/lives')
@ -60,11 +71,7 @@ class DevicehubConfig(Config):
"""This var is for save a snapshots in json format when fail something"""
API_DOC_CONFIG_TITLE = 'Devicehub'
API_DOC_CONFIG_VERSION = '0.2'
API_DOC_CONFIG_COMPONENTS = {
'securitySchemes': {
'bearerAuth': TokenAuth.API_DOCS
}
}
API_DOC_CONFIG_COMPONENTS = {'securitySchemes': {'bearerAuth': TokenAuth.API_DOCS}}
API_DOC_CLASS_DISCRIMINATOR = 'type'
PRICE_SOFTWARE = PriceSoftware.Ereuse

View File

@ -3,8 +3,10 @@ import json
from json.decoder import JSONDecodeError
from boltons.urlutils import URL
from flask import current_app as app
from flask import g, request
from flask_wtf import FlaskForm
from marshmallow import ValidationError
from sqlalchemy import or_
from sqlalchemy.util import OrderedSet
from wtforms import (
@ -25,14 +27,19 @@ from wtforms import (
from wtforms.fields import FormField
from ereuse_devicehub.db import db
from ereuse_devicehub.resources.action.models import RateComputer, Snapshot, Trade
from ereuse_devicehub.resources.action.rate.v1_0 import CannotRate
from ereuse_devicehub.parser.models import SnapshotErrors
from ereuse_devicehub.parser.parser import ParseSnapshotLsHw
from ereuse_devicehub.parser.schemas import Snapshot_lite
from ereuse_devicehub.resources.action.models import Snapshot, Trade
from ereuse_devicehub.resources.action.schemas import Snapshot as SnapshotSchema
from ereuse_devicehub.resources.action.views.snapshot import move_json, save_json
from ereuse_devicehub.resources.action.views.snapshot import (
SnapshotMix,
move_json,
save_json,
)
from ereuse_devicehub.resources.device.models import (
SAI,
Cellphone,
Computer,
Device,
Keyboard,
MemoryCardReader,
@ -43,12 +50,11 @@ from ereuse_devicehub.resources.device.models import (
)
from ereuse_devicehub.resources.device.sync import Sync
from ereuse_devicehub.resources.documents.models import DataWipeDocument
from ereuse_devicehub.resources.enums import Severity, SnapshotSoftware
from ereuse_devicehub.resources.enums import Severity
from ereuse_devicehub.resources.hash_reports import insert_hash
from ereuse_devicehub.resources.lot.models import Lot
from ereuse_devicehub.resources.tag.model import Tag
from ereuse_devicehub.resources.tradedocument.models import TradeDocument
from ereuse_devicehub.resources.user.exceptions import InsufficientPermission
from ereuse_devicehub.resources.user.models import User
DEVICES = {
@ -135,7 +141,7 @@ class LotForm(FlaskForm):
return self.instance
class UploadSnapshotForm(FlaskForm):
class UploadSnapshotForm(FlaskForm, SnapshotMix):
snapshot = MultipleFileField('Select a Snapshot File', [validators.DataRequired()])
def validate(self, extra_validators=None):
@ -175,20 +181,47 @@ class UploadSnapshotForm(FlaskForm):
return True
def is_wb_lite_snapshot(self, version: str) -> bool:
is_lite = False
if version in app.config['WORKBENCH_LITE']:
is_lite = True
return is_lite
def save(self, commit=True):
if any([x == 'Error' for x in self.result.values()]):
return
# result = []
self.sync = Sync()
schema = SnapshotSchema()
# self.tmp_snapshots = app.config['TMP_SNAPSHOTS']
# TODO @cayop get correct var config
self.tmp_snapshots = '/tmp/'
schema_lite = Snapshot_lite()
devices = []
self.tmp_snapshots = app.config['TMP_SNAPSHOTS']
for filename, snapshot_json in self.snapshots:
path_snapshot = save_json(snapshot_json, self.tmp_snapshots, g.user.email)
snapshot_json.pop('debug', None)
version = snapshot_json.get('schema_api')
if self.is_wb_lite_snapshot(version):
self.snapshot_json = schema_lite.load(snapshot_json)
snapshot_json = ParseSnapshotLsHw(self.snapshot_json).snapshot_json
try:
snapshot_json = schema.load(snapshot_json)
except ValidationError as err:
txt = "{}".format(err)
uuid = snapshot_json.get('uuid')
wbid = snapshot_json.get('wbid')
error = SnapshotErrors(
description=txt,
snapshot_uuid=uuid,
severity=Severity.Error,
wbid=wbid,
)
error.save(commit=True)
self.result[filename] = 'Error'
continue
response = self.build(snapshot_json)
db.session.add(response)
devices.append(response.device)
if hasattr(response, 'type'):
self.result[filename] = 'Ok'
@ -199,69 +232,7 @@ class UploadSnapshotForm(FlaskForm):
if commit:
db.session.commit()
return response
def build(self, snapshot_json): # noqa: C901
# this is a copy adaptated from ereuse_devicehub.resources.action.views.snapshot
device = snapshot_json.pop('device') # type: Computer
components = None
if snapshot_json['software'] == (
SnapshotSoftware.Workbench or SnapshotSoftware.WorkbenchAndroid
):
components = snapshot_json.pop('components', None)
if isinstance(device, Computer) and device.hid:
device.add_mac_to_hid(components_snap=components)
snapshot = Snapshot(**snapshot_json)
# Remove new actions from devices so they don't interfere with sync
actions_device = set(e for e in device.actions_one)
device.actions_one.clear()
if components:
actions_components = tuple(
set(e for e in c.actions_one) for c in components
)
for component in components:
component.actions_one.clear()
assert not device.actions_one
assert all(not c.actions_one for c in components) if components else True
db_device, remove_actions = self.sync.run(device, components)
del device # Do not use device anymore
snapshot.device = db_device
snapshot.actions |= remove_actions | actions_device # Set actions to snapshot
# commit will change the order of the components by what
# the DB wants. Let's get a copy of the list so we preserve order
ordered_components = OrderedSet(x for x in snapshot.components)
# Add the new actions to the db-existing devices and components
db_device.actions_one |= actions_device
if components:
for component, actions in zip(ordered_components, actions_components):
component.actions_one |= actions
snapshot.actions |= actions
if snapshot.software == SnapshotSoftware.Workbench:
# Check ownership of (non-component) device to from current.user
if db_device.owner_id != g.user.id:
raise InsufficientPermission()
# Compute ratings
try:
rate_computer, price = RateComputer.compute(db_device)
except CannotRate:
pass
else:
snapshot.actions.add(rate_computer)
if price:
snapshot.actions.add(price)
elif snapshot.software == SnapshotSoftware.WorkbenchAndroid:
pass # TODO try except to compute RateMobile
# Check if HID is null and add Severity:Warning to Snapshot
if snapshot.device.hid is None:
snapshot.severity = Severity.Warning
db.session.add(snapshot)
return snapshot
return self.result, devices
class NewDeviceForm(FlaskForm):

View File

@ -231,10 +231,11 @@ class UploadSnapshotView(GenericMixView):
'version': __version__,
}
if form.validate_on_submit():
snapshot = form.save(commit=False)
snapshot, devices = form.save(commit=False)
if lot_id:
lot = lots.filter(Lot.id == lot_id).one()
lot.devices.add(snapshot.device)
for dev in devices:
lot.devices.add(dev)
db.session.add(lot)
db.session.commit()

View File

@ -0,0 +1,42 @@
"""change firewire
Revision ID: 17288b2a7440
Revises: 8571fb32c912
Create Date: 2022-03-29 11:49:39.270791
"""
import citext
import sqlalchemy as sa
from alembic import context, op
from sqlalchemy.dialects import postgresql
# revision identifiers, used by Alembic.
revision = '17288b2a7440'
down_revision = '8571fb32c912'
branch_labels = None
depends_on = None
def get_inv():
INV = context.get_x_argument(as_dictionary=True).get('inventory')
if not INV:
raise ValueError("Inventory value is not specified")
return INV
def upgrade():
op.add_column(
'computer',
sa.Column('uuid', postgresql.UUID(as_uuid=True), nullable=True),
schema=f'{get_inv()}',
)
op.add_column(
'snapshot',
sa.Column('wbid', citext.CIText(), nullable=True),
schema=f'{get_inv()}',
)
def downgrade():
op.drop_column('computer', 'uuid', schema=f'{get_inv()}')
op.drop_column('snapshot', 'wbid', schema=f'{get_inv()}')

View File

@ -0,0 +1,56 @@
"""add snapshot errors
Revision ID: 23d9e7ebbd7d
Revises: 17288b2a7440
Create Date: 2022-04-04 19:27:48.675387
"""
import citext
import sqlalchemy as sa
from alembic import context, op
from sqlalchemy.dialects import postgresql
# revision identifiers, used by Alembic.
revision = '23d9e7ebbd7d'
down_revision = '17288b2a7440'
branch_labels = None
depends_on = None
def get_inv():
INV = context.get_x_argument(as_dictionary=True).get('inventory')
if not INV:
raise ValueError("Inventory value is not specified")
return INV
def upgrade():
op.create_table(
'snapshot_errors',
sa.Column(
'updated',
sa.TIMESTAMP(timezone=True),
server_default=sa.text('CURRENT_TIMESTAMP'),
nullable=False,
comment='The last time Devicehub recorded a change for \n this thing.\n ',
),
sa.Column(
'created',
sa.TIMESTAMP(timezone=True),
server_default=sa.text('CURRENT_TIMESTAMP'),
nullable=False,
comment='When Devicehub created this.',
),
sa.Column('id', sa.BigInteger(), nullable=False),
sa.Column('description', citext.CIText(), nullable=False),
sa.Column('snapshot_uuid', postgresql.UUID(as_uuid=True), nullable=False),
sa.Column('severity', sa.SmallInteger(), nullable=False),
sa.PrimaryKeyConstraint('id'),
schema=f'{get_inv()}',
)
op.execute(f"CREATE SEQUENCE {get_inv()}.snapshot_errors_seq START 1;")
def downgrade():
op.drop_table('snapshot_errors', schema=f'{get_inv()}')
op.execute(f"DROP SEQUENCE {get_inv()}.snapshot_errors_seq;")

View File

@ -0,0 +1,57 @@
"""add wbid user in snapshotErrors
Revision ID: 97bef94f7982
Revises: 23d9e7ebbd7d
Create Date: 2022-04-12 09:27:59.670911
"""
import citext
import sqlalchemy as sa
from alembic import context, op
from sqlalchemy.dialects import postgresql
# revision identifiers, used by Alembic.
revision = '97bef94f7982'
down_revision = '23d9e7ebbd7d'
branch_labels = None
depends_on = None
def get_inv():
INV = context.get_x_argument(as_dictionary=True).get('inventory')
if not INV:
raise ValueError("Inventory value is not specified")
return INV
def upgrade():
op.add_column(
'snapshot_errors',
sa.Column('wbid', citext.CIText(), nullable=True),
schema=f'{get_inv()}',
)
op.add_column(
'snapshot_errors',
sa.Column('owner_id', postgresql.UUID(), nullable=True),
schema=f'{get_inv()}',
)
op.create_foreign_key(
"fk_snapshot_errors_owner_id_user_id",
"snapshot_errors",
"user",
["owner_id"],
["id"],
ondelete="SET NULL",
source_schema=f'{get_inv()}',
referent_schema='common',
)
def downgrade():
op.drop_constraint(
"fk_snapshot_errors_owner_id_user_id",
"snapshot_errors",
type_="foreignkey",
schema=f'{get_inv()}',
)
op.drop_column('snapshot_errors', 'owner_id', schema=f'{get_inv()}')

View File

@ -0,0 +1,25 @@
from pathlib import Path
from pint import UnitRegistry
# Sets up the unit handling
unit_registry = Path(__file__).parent / 'unit_registry'
unit = UnitRegistry()
unit.load_definitions(str(unit_registry / 'quantities.txt'))
TB = unit.TB
GB = unit.GB
MB = unit.MB
Mbs = unit.Mbit / unit.s
MBs = unit.MB / unit.s
Hz = unit.Hz
GHz = unit.GHz
MHz = unit.MHz
Inch = unit.inch
mAh = unit.hour * unit.mA
mV = unit.mV
base2 = UnitRegistry()
base2.load_definitions(str(unit_registry / 'base2.quantities.txt'))
GiB = base2.GiB

View File

@ -0,0 +1,444 @@
import re
from contextlib import suppress
from datetime import datetime
from fractions import Fraction
from math import hypot
from typing import Iterator, List, Optional, Type, TypeVar
import dateutil.parser
from ereuse_utils import getter, text
from ereuse_utils.nested_lookup import (
get_nested_dicts_with_key_containing_value,
get_nested_dicts_with_key_value,
)
from ereuse_devicehub.parser import base2, unit, utils
from ereuse_devicehub.parser.utils import Dumpeable
class Device(Dumpeable):
"""
Base class for a computer and each component, containing
its physical characteristics (like serial number) and Devicehub
actions. For Devicehub actions, this class has an interface to execute
:meth:`.benchmarks`.
"""
def __init__(self, *sources) -> None:
"""Gets the device information."""
self.actions = set()
self.type = self.__class__.__name__
super().__init__()
def from_lshw(self, lshw_node: dict):
self.manufacturer = getter.dict(lshw_node, 'vendor', default=None, type=str)
self.model = getter.dict(
lshw_node,
'product',
remove={self.manufacturer} if self.manufacturer else set(),
default=None,
type=str,
)
self.serial_number = getter.dict(lshw_node, 'serial', default=None, type=str)
def __str__(self) -> str:
return ' '.join(x for x in (self.model, self.serial_number) if x)
C = TypeVar('C', bound='Component')
class Component(Device):
@classmethod
def new(cls, lshw, hwinfo, **kwargs) -> Iterator[C]:
raise NotImplementedError()
class Processor(Component):
@classmethod
def new(cls, lshw: dict, **kwargs) -> Iterator[C]:
nodes = get_nested_dicts_with_key_value(lshw, 'class', 'processor')
# We want only the physical cpu's, not the logic ones
# In some cases we may get empty cpu nodes, we can detect them because
# all regular cpus have at least a description (Intel Core i5...)
return (
cls(node)
for node in nodes
if 'logical' not in node['id']
and node.get('description', '').lower() != 'co-processor'
and not node.get('disabled')
and 'co-processor' not in node.get('model', '').lower()
and 'co-processor' not in node.get('description', '').lower()
and 'width' in node
)
def __init__(self, node: dict) -> None:
super().__init__(node)
self.from_lshw(node)
self.speed = unit.Quantity(node['size'], node['units']).to('gigahertz').m
self.address = node['width']
try:
self.cores = int(node['configuration']['cores'])
self.threads = int(node['configuration']['threads'])
except KeyError:
self.threads = 1
self.cores = 1
self.serial_number = None # Processors don't have valid SN :-(
self.brand, self.generation = self.processor_brand_generation(self.model)
assert not hasattr(self, 'cores') or 1 <= self.cores <= 16
@staticmethod # noqa: C901
def processor_brand_generation(model: str):
"""Generates the ``brand`` and ``generation`` fields for the given model.
This returns a tuple with:
- The brand as a string or None.
- The generation as an int or None.
Intel desktop processor numbers:
https://www.intel.com/content/www/us/en/processors/processor-numbers.html
Intel server processor numbers:
https://www.intel.com/content/www/us/en/processors/processor-numbers-data-center.html
"""
if 'Duo' in model:
return 'Core2 Duo', None
if 'Quad' in model:
return 'Core2 Quad', None
if 'Atom' in model:
return 'Atom', None
if 'Celeron' in model:
return 'Celeron', None
if 'Pentium' in model:
return 'Pentium', None
if 'Xeon Platinum' in model:
generation = int(re.findall(r'\bPlatinum \d{4}\w', model)[0][10])
return 'Xeon Platinum', generation
if 'Xeon Gold' in model:
generation = int(re.findall(r'\bGold \d{4}\w', model)[0][6])
return 'Xeon Gold', generation
if 'Xeon' in model: # Xeon E5...
generation = 1
results = re.findall(r'\bV\d\b', model) # find V1, V2...
if results:
generation = int(results[0][1])
return 'Xeon', generation
results = re.findall(r'\bi\d-\w+', model) # i3-XXX..., i5-XXX...
if results: # i3, i5...
return 'Core i{}'.format(results[0][1]), int(results[0][3])
results = re.findall(r'\bi\d CPU \w+', model)
if results: # i3 CPU XXX
return 'Core i{}'.format(results[0][1]), 1
results = re.findall(r'\bm\d-\w+', model) # m3-XXXX...
if results:
return 'Core m{}'.format(results[0][1]), None
return None, None
def __str__(self) -> str:
return super().__str__() + (
' ({} generation)'.format(self.generation) if self.generation else ''
)
class RamModule(Component):
@classmethod
def new(cls, lshw, **kwargs) -> Iterator[C]:
# We can get flash memory (BIOS?), system memory and unknown types of memory
memories = get_nested_dicts_with_key_value(lshw, 'class', 'memory')
TYPES = {'ddr', 'sdram', 'sodimm'}
for memory in memories:
physical_ram = any(
t in memory.get('description', '').lower() for t in TYPES
)
not_empty = 'size' in memory
if physical_ram and not_empty:
yield cls(memory)
def __init__(self, node: dict) -> None:
# Node with no size == empty ram slot
super().__init__(node)
self.from_lshw(node)
description = node['description'].upper()
self.format = 'SODIMM' if 'SODIMM' in description else 'DIMM'
self.size = base2.Quantity(node['size'], node['units']).to('MiB').m
# self.size = int(utils.convert_capacity(node['size'], node['units'], 'MB'))
for w in description.split():
if w.startswith('DDR'): # We assume all DDR are SDRAM
self.interface = w
break
elif w.startswith('SDRAM'):
# Fallback. SDRAM is generic denomination for DDR types.
self.interface = w
if 'clock' in node:
self.speed = unit.Quantity(node['clock'], 'Hz').to('MHz').m
assert not hasattr(self, 'speed') or 100.0 <= self.speed <= 1000000000000.0
def __str__(self) -> str:
return '{} {} {}'.format(super().__str__(), self.format, self.size)
class GraphicCard(Component):
@classmethod
def new(cls, lshw, hwinfo, **kwargs) -> Iterator[C]:
nodes = get_nested_dicts_with_key_value(lshw, 'class', 'display')
return (cls(n) for n in nodes if n['configuration'].get('driver', None))
def __init__(self, node: dict) -> None:
super().__init__(node)
self.from_lshw(node)
self.memory = self._memory(node['businfo'].split('@')[1])
@staticmethod
def _memory(bus_info):
"""The size of the memory of the gpu."""
return None
def __str__(self) -> str:
return '{} with {}'.format(super().__str__(), self.memory)
class Motherboard(Component):
INTERFACES = 'usb', 'firewire', 'serial', 'pcmcia'
@classmethod
def new(cls, lshw, hwinfo, **kwargs) -> C:
node = next(get_nested_dicts_with_key_value(lshw, 'description', 'Motherboard'))
bios_node = next(get_nested_dicts_with_key_value(lshw, 'id', 'firmware'))
# bios_node = '1'
memory_array = next(
getter.indents(hwinfo, 'Physical Memory Array', indent=' '), None
)
return cls(node, bios_node, memory_array)
def __init__(
self, node: dict, bios_node: dict, memory_array: Optional[List[str]]
) -> None:
super().__init__(node)
self.from_lshw(node)
self.usb = self.num_interfaces(node, 'usb')
self.firewire = self.num_interfaces(node, 'firewire')
self.serial = self.num_interfaces(node, 'serial')
self.pcmcia = self.num_interfaces(node, 'pcmcia')
self.slots = int(2)
# run(
# 'dmidecode -t 17 | ' 'grep -o BANK | ' 'wc -l',
# check=True,
# universal_newlines=True,
# shell=True,
# stdout=PIPE,
# ).stdout
self.bios_date = dateutil.parser.parse(bios_node['date']).isoformat()
self.version = bios_node['version']
self.ram_slots = self.ram_max_size = None
if memory_array:
self.ram_slots = getter.kv(memory_array, 'Slots', default=None)
self.ram_max_size = getter.kv(memory_array, 'Max. Size', default=None)
if self.ram_max_size:
self.ram_max_size = next(text.numbers(self.ram_max_size))
@staticmethod
def num_interfaces(node: dict, interface: str) -> int:
interfaces = get_nested_dicts_with_key_containing_value(node, 'id', interface)
if interface == 'usb':
interfaces = (
c
for c in interfaces
if 'usbhost' not in c['id'] and 'usb' not in c['businfo']
)
return len(tuple(interfaces))
def __str__(self) -> str:
return super().__str__()
class NetworkAdapter(Component):
@classmethod
def new(cls, lshw, hwinfo, **kwargs) -> Iterator[C]:
nodes = get_nested_dicts_with_key_value(lshw, 'class', 'network')
return (cls(node) for node in nodes)
def __init__(self, node: dict) -> None:
super().__init__(node)
self.from_lshw(node)
self.speed = None
if 'capacity' in node:
self.speed = unit.Quantity(node['capacity'], 'bit/s').to('Mbit/s').m
if 'logicalname' in node: # todo this was taken from 'self'?
# If we don't have logicalname it means we don't have the
# (proprietary) drivers fot that NetworkAdaptor
# which means we can't access at the MAC address
# (note that S/N == MAC) "sudo /sbin/lspci -vv" could bring
# the MAC even if no drivers are installed however more work
# has to be done in ensuring it is reliable, really needed,
# and to parse it
# https://www.redhat.com/archives/redhat-list/2010-October/msg00066.html
# workbench-live includes proprietary firmwares
self.serial_number = self.serial_number or utils.get_hw_addr(
node['logicalname']
)
self.variant = node.get('version', None)
self.wireless = bool(node.get('configuration', {}).get('wireless', False))
def __str__(self) -> str:
return '{} {} {}'.format(
super().__str__(), self.speed, 'wireless' if self.wireless else 'ethernet'
)
class SoundCard(Component):
@classmethod
def new(cls, lshw, hwinfo, **kwargs) -> Iterator[C]:
nodes = get_nested_dicts_with_key_value(lshw, 'class', 'multimedia')
return (cls(node) for node in nodes)
def __init__(self, node) -> None:
super().__init__(node)
self.from_lshw(node)
class Display(Component):
TECHS = 'CRT', 'TFT', 'LED', 'PDP', 'LCD', 'OLED', 'AMOLED'
"""Display technologies"""
@classmethod
def new(cls, lshw, hwinfo, **kwargs) -> Iterator[C]:
for node in getter.indents(hwinfo, 'Monitor'):
yield cls(node)
def __init__(self, node: dict) -> None:
super().__init__(node)
self.model = getter.kv(node, 'Model')
self.manufacturer = getter.kv(node, 'Vendor')
self.serial_number = getter.kv(node, 'Serial ID', default=None, type=str)
self.resolution_width, self.resolution_height, refresh_rate = text.numbers(
getter.kv(node, 'Resolution')
)
self.refresh_rate = unit.Quantity(refresh_rate, 'Hz').m
with suppress(StopIteration):
# some monitors can have several resolutions, and the one
# in "Detailed Timings" seems the highest one
timings = next(getter.indents(node, 'Detailed Timings', indent=' '))
self.resolution_width, self.resolution_height = text.numbers(
getter.kv(timings, 'Resolution')
)
x, y = (
unit.convert(v, 'millimeter', 'inch')
for v in text.numbers(getter.kv(node, 'Size'))
)
self.size = hypot(x, y)
self.technology = next((t for t in self.TECHS if t in node[0]), None)
d = '{} {} 0'.format(
getter.kv(node, 'Year of Manufacture'),
getter.kv(node, 'Week of Manufacture'),
)
# We assume it has been produced the first day of such week
self.production_date = datetime.strptime(d, '%Y %W %w').isoformat()
self._aspect_ratio = Fraction(self.resolution_width, self.resolution_height)
def __str__(self) -> str:
return (
'{0} {1.resolution_width}x{1.resolution_height} {1.size} inches {2}'.format(
super().__str__(), self, self._aspect_ratio
)
)
class Computer(Device):
CHASSIS_TYPE = {
'Desktop': {
'desktop',
'low-profile',
'tower',
'docking',
'all-in-one',
'pizzabox',
'mini-tower',
'space-saving',
'lunchbox',
'mini',
'stick',
},
'Laptop': {
'portable',
'laptop',
'convertible',
'tablet',
'detachable',
'notebook',
'handheld',
'sub-notebook',
},
'Server': {'server'},
'Computer': {'_virtual'},
}
"""
A translation dictionary whose keys are Devicehub types and values
are possible chassis values that `dmi <https://ezix.org/src/pkg/
lshw/src/master/src/core/dmi.cc#L632>`_ can offer.
"""
CHASSIS_DH = {
'Tower': {'desktop', 'low-profile', 'tower', 'server'},
'Docking': {'docking'},
'AllInOne': {'all-in-one'},
'Microtower': {'mini-tower', 'space-saving', 'mini'},
'PizzaBox': {'pizzabox'},
'Lunchbox': {'lunchbox'},
'Stick': {'stick'},
'Netbook': {'notebook', 'sub-notebook'},
'Handheld': {'handheld'},
'Laptop': {'portable', 'laptop'},
'Convertible': {'convertible'},
'Detachable': {'detachable'},
'Tablet': {'tablet'},
'Virtual': {'_virtual'},
}
"""
A conversion table from DMI's chassis type value Devicehub
chassis value.
"""
COMPONENTS = list(Component.__subclasses__()) # type: List[Type[Component]]
COMPONENTS.remove(Motherboard)
def __init__(self, node: dict) -> None:
super().__init__(node)
self.from_lshw(node)
chassis = node.get('configuration', {}).get('chassis', '_virtual')
self.type = next(
t for t, values in self.CHASSIS_TYPE.items() if chassis in values
)
self.chassis = next(
t for t, values in self.CHASSIS_DH.items() if chassis in values
)
self.sku = getter.dict(node, ('configuration', 'sku'), default=None, type=str)
self.version = getter.dict(node, 'version', default=None, type=str)
self._ram = None
@classmethod
def run(cls, lshw, hwinfo_raw):
"""
Gets hardware information from the computer and its components,
like serial numbers or model names, and benchmarks them.
This function uses ``LSHW`` as the main source of hardware information,
which is obtained once when it is instantiated.
"""
hwinfo = hwinfo_raw.splitlines()
computer = cls(lshw)
components = []
for Component in cls.COMPONENTS:
if Component == Display and computer.type != 'Laptop':
continue # Only get display info when computer is laptop
components.extend(Component.new(lshw=lshw, hwinfo=hwinfo))
components.append(Motherboard.new(lshw, hwinfo))
computer._ram = sum(
ram.size for ram in components if isinstance(ram, RamModule)
)
return computer, components
def __str__(self) -> str:
specs = super().__str__()
return '{} with {} MB of RAM.'.format(specs, self._ram)

View File

@ -0,0 +1,32 @@
from citext import CIText
from flask import g
from sqlalchemy import BigInteger, Column, Sequence, SmallInteger
from sqlalchemy.dialects.postgresql import UUID
from ereuse_devicehub.db import db
from ereuse_devicehub.resources.enums import Severity
from ereuse_devicehub.resources.models import Thing
from ereuse_devicehub.resources.user.models import User
class SnapshotErrors(Thing):
"""A Snapshot errors."""
id = Column(BigInteger, Sequence('snapshot_errors_seq'), primary_key=True)
description = Column(CIText(), default='', nullable=False)
wbid = Column(CIText(), nullable=True)
severity = Column(SmallInteger, default=Severity.Info, nullable=False)
snapshot_uuid = Column(UUID(as_uuid=True), nullable=False)
owner_id = db.Column(
UUID(as_uuid=True),
db.ForeignKey(User.id),
nullable=False,
default=lambda: g.user.id,
)
owner = db.relationship(User, primaryjoin=owner_id == User.id)
def save(self, commit=False):
db.session.add(self)
if commit:
db.session.commit()

View File

@ -0,0 +1,548 @@
import json
import logging
import uuid
from dmidecode import DMIParse
from marshmallow import ValidationError
from ereuse_devicehub.parser import base2
from ereuse_devicehub.parser.computer import Computer
from ereuse_devicehub.parser.models import SnapshotErrors
from ereuse_devicehub.resources.action.schemas import Snapshot
from ereuse_devicehub.resources.enums import DataStorageInterface, Severity
logger = logging.getLogger(__name__)
class ParseSnapshot:
def __init__(self, snapshot, default="n/a"):
self.default = default
self.dmidecode_raw = snapshot["data"]["dmidecode"]
self.smart_raw = snapshot["data"]["smart"]
self.hwinfo_raw = snapshot["data"]["hwinfo"]
self.device = {"actions": []}
self.components = []
self.dmi = DMIParse(self.dmidecode_raw)
self.smart = self.loads(self.smart_raw)
self.hwinfo = self.parse_hwinfo()
self.set_basic_datas()
self.set_components()
self.snapshot_json = {
"device": self.device,
"software": "Workbench",
"components": self.components,
"uuid": snapshot['uuid'],
"type": snapshot['type'],
"version": "14.0.0",
"endTime": snapshot["timestamp"],
"elapsed": 1,
"wbid": snapshot["wbid"],
}
def get_snapshot(self):
return Snapshot().load(self.snapshot_json)
def set_basic_datas(self):
self.device['manufacturer'] = self.dmi.manufacturer()
self.device['model'] = self.dmi.model()
self.device['serialNumber'] = self.dmi.serial_number()
self.device['type'] = self.get_type()
self.device['sku'] = self.get_sku()
self.device['version'] = self.get_version()
self.device['uuid'] = self.get_uuid()
def set_components(self):
self.get_cpu()
self.get_ram()
self.get_mother_board()
self.get_data_storage()
self.get_networks()
def get_cpu(self):
# TODO @cayop generation, brand and address not exist in dmidecode
for cpu in self.dmi.get('Processor'):
self.components.append(
{
"actions": [],
"type": "Processor",
"speed": self.get_cpu_speed(cpu),
"cores": int(cpu.get('Core Count', 1)),
"model": cpu.get('Version'),
"threads": int(cpu.get('Thread Count', 1)),
"manufacturer": cpu.get('Manufacturer'),
"serialNumber": cpu.get('Serial Number'),
"generation": cpu.get('Generation'),
"brand": cpu.get('Brand'),
"address": cpu.get('Address'),
}
)
def get_ram(self):
# TODO @cayop format and model not exist in dmidecode
for ram in self.dmi.get("Memory Device"):
self.components.append(
{
"actions": [],
"type": "RamModule",
"size": self.get_ram_size(ram),
"speed": self.get_ram_speed(ram),
"manufacturer": ram.get("Manufacturer", self.default),
"serialNumber": ram.get("Serial Number", self.default),
"interface": self.get_ram_type(ram),
"format": self.get_ram_format(ram),
"model": ram.get("Part Number", self.default),
}
)
def get_mother_board(self):
# TODO @cayop model, not exist in dmidecode
for moder_board in self.dmi.get("Baseboard"):
self.components.append(
{
"actions": [],
"type": "Motherboard",
"version": moder_board.get("Version"),
"serialNumber": moder_board.get("Serial Number"),
"manufacturer": moder_board.get("Manufacturer"),
"biosDate": self.get_bios_date(),
# "firewire": self.get_firmware(),
"ramMaxSize": self.get_max_ram_size(),
"ramSlots": len(self.dmi.get("Memory Device")),
"slots": self.get_ram_slots(),
"model": moder_board.get("Product Name"), # ??
"pcmcia": self.get_pcmcia_num(), # ??
"serial": self.get_serial_num(), # ??
"usb": self.get_usb_num(),
}
)
def get_usb_num(self):
return len(
[u for u in self.dmi.get("Port Connector") if u.get("Port Type") == "USB"]
)
def get_serial_num(self):
return len(
[
u
for u in self.dmi.get("Port Connector")
if u.get("Port Type") == "SERIAL"
]
)
def get_pcmcia_num(self):
return len(
[
u
for u in self.dmi.get("Port Connector")
if u.get("Port Type") == "PCMCIA"
]
)
def get_bios_date(self):
return self.dmi.get("BIOS")[0].get("Release Date", self.default)
def get_firmware(self):
return self.dmi.get("BIOS")[0].get("Firmware Revision", '1')
def get_max_ram_size(self):
size = 0
for slot in self.dmi.get("Physical Memory Array"):
capacity = slot.get("Maximum Capacity", '0').split(" ")[0]
size += int(capacity)
return size
def get_ram_slots(self):
slots = 0
for x in self.dmi.get("Physical Memory Array"):
slots += int(x.get("Number Of Devices", 0))
return slots
def get_ram_size(self, ram):
size = ram.get("Size", "0")
return int(size.split(" ")[0])
def get_ram_speed(self, ram):
size = ram.get("Speed", "0")
return int(size.split(" ")[0])
def get_ram_type(self, ram):
TYPES = {'ddr', 'sdram', 'sodimm'}
for t in TYPES:
if t in ram.get("Type", "DDR"):
return t
def get_ram_format(self, ram):
channel = ram.get("Locator", "DIMM")
return 'SODIMM' if 'SODIMM' in channel else 'DIMM'
def get_cpu_speed(self, cpu):
speed = cpu.get('Max Speed', "0")
return float(speed.split(" ")[0]) / 1024
def get_sku(self):
return self.dmi.get("System")[0].get("SKU Number", self.default)
def get_version(self):
return self.dmi.get("System")[0].get("Version", self.default)
def get_uuid(self):
return self.dmi.get("System")[0].get("UUID", self.default)
def get_chassis(self):
return self.dmi.get("Chassis")[0].get("Type", self.default)
def get_type(self):
chassis_type = self.get_chassis()
return self.translation_to_devicehub(chassis_type)
def translation_to_devicehub(self, original_type):
lower_type = original_type.lower()
CHASSIS_TYPE = {
'Desktop': [
'desktop',
'low-profile',
'tower',
'docking',
'all-in-one',
'pizzabox',
'mini-tower',
'space-saving',
'lunchbox',
'mini',
'stick',
],
'Laptop': [
'portable',
'laptop',
'convertible',
'tablet',
'detachable',
'notebook',
'handheld',
'sub-notebook',
],
'Server': ['server'],
'Computer': ['_virtual'],
}
for k, v in CHASSIS_TYPE.items():
if lower_type in v:
return k
return self.default
def get_data_storage(self):
for sm in self.smart:
model = sm.get('model_name')
manufacturer = None
if len(model.split(" ")) == 2:
manufacturer, model = model.split(" ")
self.components.append(
{
"actions": [],
"type": self.get_data_storage_type(sm),
"model": model,
"manufacturer": manufacturer,
"serialNumber": sm.get('serial_number'),
"size": self.get_data_storage_size(sm),
"variant": sm.get("firmware_version"),
"interface": self.get_data_storage_interface(sm),
}
)
def get_data_storage_type(self, x):
# TODO @cayop add more SSDS types
SSDS = ["nvme"]
SSD = 'SolidStateDrive'
HDD = 'HardDrive'
type_dev = x.get('device', {}).get('type')
return SSD if type_dev in SSDS else HDD
def get_data_storage_interface(self, x):
return x.get('device', {}).get('protocol', 'ATA')
def get_data_storage_size(self, x):
type_dev = x.get('device', {}).get('type')
total_capacity = "{type}_total_capacity".format(type=type_dev)
# convert bytes to Mb
return x.get(total_capacity) / 1024**2
def get_networks(self):
hw_class = " Hardware Class: "
mac = " Permanent HW Address: "
model = " Model: "
wireless = "wireless"
for line in self.hwinfo:
iface = {
"variant": "1",
"actions": [],
"speed": 100.0,
"type": "NetworkAdapter",
"wireless": False,
"manufacturer": "Ethernet",
}
for y in line:
if hw_class in y and not y.split(hw_class)[1] == 'network':
break
if mac in y:
iface["serialNumber"] = y.split(mac)[1]
if model in y:
iface["model"] = y.split(model)[1]
if wireless in y:
iface["wireless"] = True
if iface.get("serialNumber"):
self.components.append(iface)
def parse_hwinfo(self):
hw_blocks = self.hwinfo_raw.split("\n\n")
return [x.split("\n") for x in hw_blocks]
def loads(self, x):
if isinstance(x, str):
return json.loads(x)
return x
class ParseSnapshotLsHw:
def __init__(self, snapshot, default="n/a"):
self.default = default
self.uuid = snapshot.get("uuid")
self.wbid = snapshot.get("wbid")
self.dmidecode_raw = snapshot["data"]["dmidecode"]
self.smart = snapshot["data"]["smart"]
self.hwinfo_raw = snapshot["data"]["hwinfo"]
self.lshw = snapshot["data"]["lshw"]
self.device = {"actions": []}
self.components = []
self.components_obj = []
self._errors = []
self.dmi = DMIParse(self.dmidecode_raw)
self.hwinfo = self.parse_hwinfo()
self.set_basic_datas()
self.set_components()
self.snapshot_json = {
"type": "Snapshot",
"device": self.device,
"software": "Workbench",
"components": self.components,
"uuid": snapshot['uuid'],
"version": "14.0.0",
"endTime": snapshot["timestamp"],
"elapsed": 1,
"wbid": snapshot["wbid"],
}
def get_snapshot(self):
try:
return Snapshot().load(self.snapshot_json)
except ValidationError as err:
txt = "{}".format(err)
uuid = self.snapshot_json.get('uuid')
wbid = self.snapshot_json.get('wbid')
error = SnapshotErrors(
description=txt, snapshot_uuid=uuid, severity=Severity.Error, wbid=wbid
)
error.save(commit=True)
raise err
def parse_hwinfo(self):
hw_blocks = self.hwinfo_raw.split("\n\n")
return [x.split("\n") for x in hw_blocks]
def loads(self, x):
if isinstance(x, str):
return json.loads(x)
return x
def set_basic_datas(self):
pc, self.components_obj = Computer.run(self.lshw, self.hwinfo_raw)
self.device = pc.dump()
self.device['uuid'] = self.get_uuid()
def set_components(self):
memory = None
for x in self.components_obj:
if x.type == 'RamModule':
memory = 1
if x.type == 'Motherboard':
x.slots = self.get_ram_slots()
self.components.append(x.dump())
if not memory:
self.get_ram()
self.get_data_storage()
def get_ram(self):
for ram in self.dmi.get("Memory Device"):
self.components.append(
{
"actions": [],
"type": "RamModule",
"size": self.get_ram_size(ram),
"speed": self.get_ram_speed(ram),
"manufacturer": ram.get("Manufacturer", self.default),
"serialNumber": ram.get("Serial Number", self.default),
"interface": self.get_ram_type(ram),
"format": self.get_ram_format(ram),
"model": ram.get("Part Number", self.default),
}
)
def get_ram_size(self, ram):
size = ram.get("Size")
if not len(size.split(" ")) == 2:
txt = "Error: Snapshot: {uuid}, tag: {wbid} have this ram Size: {size}".format(
uuid=self.uuid, size=size, wbid=self.wbid
)
self.errors(txt)
return 128
size, units = size.split(" ")
return base2.Quantity(float(size), units).to('MiB').m
def get_ram_speed(self, ram):
speed = ram.get("Speed", "100")
if not len(speed.split(" ")) == 2:
txt = "Error: Snapshot: {uuid}, tag: {wbid} have this ram Speed: {speed}".format(
uuid=self.uuid, speed=speed, wbid=self.wbid
)
self.errors(txt)
return 100
speed, units = speed.split(" ")
return float(speed)
# TODO @cayop is neccesary change models for accept sizes more high of speed or change to string
# return base2.Quantity(float(speed), units).to('MHz').m
def get_ram_slots(self):
slots = 0
for x in self.dmi.get("Physical Memory Array"):
slots += int(x.get("Number Of Devices", 0))
return slots
def get_ram_type(self, ram):
TYPES = {'ddr', 'sdram', 'sodimm'}
for t in TYPES:
if t in ram.get("Type", "DDR"):
return t
def get_ram_format(self, ram):
channel = ram.get("Locator", "DIMM")
return 'SODIMM' if 'SODIMM' in channel else 'DIMM'
def get_uuid(self):
dmi_uuid = 'undefined'
if self.dmi.get("System"):
dmi_uuid = self.dmi.get("System")[0].get("UUID")
try:
uuid.UUID(dmi_uuid)
except (ValueError, AttributeError) as err:
self.errors("{}".format(err))
txt = "Error: Snapshot: {uuid} tag: {wbid} have this uuid: {device}".format(
uuid=self.uuid, device=dmi_uuid, wbid=self.wbid
)
self.errors(txt)
dmi_uuid = None
return dmi_uuid
def get_data_storage(self):
for sm in self.smart:
model = sm.get('model_name')
manufacturer = None
if model and len(model.split(" ")) > 1:
mm = model.split(" ")
model = mm[-1]
manufacturer = " ".join(mm[:-1])
self.components.append(
{
"actions": [self.get_test_data_storage(sm)],
"type": self.get_data_storage_type(sm),
"model": model,
"manufacturer": manufacturer,
"serialNumber": sm.get('serial_number'),
"size": self.get_data_storage_size(sm),
"variant": sm.get("firmware_version"),
"interface": self.get_data_storage_interface(sm),
}
)
def get_data_storage_type(self, x):
# TODO @cayop add more SSDS types
SSDS = ["nvme"]
SSD = 'SolidStateDrive'
HDD = 'HardDrive'
type_dev = x.get('device', {}).get('type')
trim = x.get("trim", {}).get("supported") == "true"
return SSD if type_dev in SSDS or trim else HDD
def get_data_storage_interface(self, x):
interface = x.get('device', {}).get('protocol', 'ATA')
try:
DataStorageInterface(interface.upper())
except ValueError as err:
txt = "tag: {}, interface {} is not in DataStorageInterface Enum".format(
interface, self.wbid
)
self.errors("{}".format(err))
self.errors(txt)
return "ATA"
def get_data_storage_size(self, x):
type_dev = x.get('device', {}).get('protocol', '').lower()
total_capacity = "{type}_total_capacity".format(type=type_dev)
if not x.get(total_capacity):
return 1
# convert bytes to Mb
return x.get(total_capacity) / 1024**2
def get_test_data_storage(self, smart):
log = "smart_health_information_log"
action = {
"status": "Completed without error",
"reallocatedSectorCount": smart.get("reallocated_sector_count", 0),
"currentPendingSectorCount": smart.get("current_pending_sector_count", 0),
"assessment": True,
"severity": "Info",
"offlineUncorrectable": smart.get("offline_uncorrectable", 0),
"lifetime": 0,
"type": "TestDataStorage",
"length": "Short",
"elapsed": 0,
"reportedUncorrectableErrors": smart.get(
"reported_uncorrectable_errors", 0
),
"powerCycleCount": smart.get("power_cycle_count", 0),
}
for k in smart.keys():
if log in k:
action['lifetime'] = smart[k].get("power_on_hours", 0)
action['powerOnHours'] = smart[k].get("power_on_hours", 0)
return action
def errors(self, txt=None, severity=Severity.Info):
if not txt:
return self._errors
logger.error(txt)
self._errors.append(txt)
error = SnapshotErrors(
description=txt, snapshot_uuid=self.uuid, severity=severity, wbid=self.wbid
)
error.save()

View File

@ -0,0 +1,36 @@
from flask import current_app as app
from marshmallow import Schema as MarshmallowSchema
from marshmallow import ValidationError, validates_schema
from marshmallow.fields import Dict, List, Nested, String
from ereuse_devicehub.resources.schemas import Thing
class Snapshot_lite_data(MarshmallowSchema):
dmidecode = String(required=False)
hwinfo = String(required=False)
smart = List(Dict(), required=False)
lshw = Dict(required=False)
lspci = String(required=False)
class Snapshot_lite(Thing):
uuid = String(required=True)
version = String(required=True)
schema_api = String(required=True)
software = String(required=True)
wbid = String(required=True)
type = String(required=True)
timestamp = String(required=True)
data = Nested(Snapshot_lite_data)
@validates_schema
def validate_workbench_version(self, data: dict):
if data['schema_api'] not in app.config['WORKBENCH_LITE']:
raise ValidationError(
'Min. supported Workbench version is '
'{} but yours is {}.'.format(
app.config['WORKBENCH_LITE'][0], data['version']
),
field_names=['version'],
)

View File

@ -0,0 +1,38 @@
from datetime import datetime, timezone
from typing import List
from ereuse_workbench.computer import Component, Computer, DataStorage
from ereuse_workbench.utils import Dumpeable
class Snapshot(Dumpeable):
"""
Generates the Snapshot report for Devicehub by obtaining the
data from the computer, performing benchmarks and tests...
After instantiating the class, run :meth:`.computer` before any
other method.
"""
def __init__(self, uuid, software, version, lshw, hwinfo):
self.type = 'Snapshot'
self.uuid = uuid
self.software = software
self.version = version
self.lshw = lshw
self.hwinfo = hwinfo
self.endTime = datetime.now(timezone.utc)
self.closed = False
self.elapsed = None
self.device = None # type: Computer
self.components = None # type: List[Component]
self._storages = None
def computer(self):
"""Retrieves information about the computer and components."""
self.device, self.components = Computer.run(self.lshw, self.hwinfo)
self._storages = tuple(c for c in self.components if isinstance(c, DataStorage))
def close(self):
"""Closes the Snapshot"""
self.closed = True

View File

@ -0,0 +1,4 @@
K = KiB = k = kb = KB
M = MiB = m = mb = MB
G = GiB = g = gb = GB
T = TiB = t = tb = TB

View File

@ -0,0 +1,9 @@
HZ = hertz = hz
KHZ = kilohertz = khz
MHZ = megahertz = mhz
GHZ = gigahertz = ghz
B = byte = b = UNIT = unit
KB = kilobyte = kb = K = k
MB = megabyte = mb = M = m
GB = gigabyte = gb = G = g
T = terabyte = tb = T = t

View File

@ -0,0 +1,38 @@
import datetime
import fcntl
import socket
import struct
from contextlib import contextmanager
from enum import Enum
from ereuse_utils import Dumpeable
class Severity(Enum):
Info = 'Info'
Error = 'Error'
def get_hw_addr(ifname):
# http://stackoverflow.com/a/4789267/1538221
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
info = fcntl.ioctl(s.fileno(), 0x8927, struct.pack('256s', ifname[:15]))
return ':'.join('%02x' % ord(char) for char in info[18:24])
class Measurable(Dumpeable):
"""A base class that allows measuring execution times."""
def __init__(self) -> None:
super().__init__()
self.elapsed = None
@contextmanager
def measure(self):
init = datetime.datetime.now(datetime.timezone.utc)
yield
self.elapsed = datetime.datetime.now(datetime.timezone.utc) - init
try:
assert self.elapsed.total_seconds() > 0
except AssertionError:
self.elapsed = datetime.timedelta(seconds=0)

File diff suppressed because it is too large Load Diff

View File

@ -1,14 +1,28 @@
import copy
from datetime import datetime, timedelta
from dateutil.tz import tzutc
from flask import current_app as app, g
from marshmallow import Schema as MarshmallowSchema, ValidationError, fields as f, validates_schema, pre_load, post_load
from marshmallow.fields import Boolean, DateTime, Decimal, Float, Integer, Nested, String, \
TimeDelta, UUID
from flask import current_app as app
from flask import g
from marshmallow import Schema as MarshmallowSchema
from marshmallow import ValidationError
from marshmallow import fields as f
from marshmallow import post_load, pre_load, validates_schema
from marshmallow.fields import (
UUID,
Boolean,
DateTime,
Decimal,
Float,
Integer,
Nested,
String,
TimeDelta,
)
from marshmallow.validate import Length, OneOf, Range
from sqlalchemy.util import OrderedSet
from teal.enums import Country, Currency, Subdivision
from teal.marshmallow import EnumField, IP, SanitizedStr, URL, Version
from teal.marshmallow import IP, URL, EnumField, SanitizedStr, Version
from teal.resource import Schema
from ereuse_devicehub.marshmallow import NestedOn
@ -16,24 +30,32 @@ from ereuse_devicehub.resources import enums
from ereuse_devicehub.resources.action import models as m
from ereuse_devicehub.resources.agent import schemas as s_agent
from ereuse_devicehub.resources.device import schemas as s_device
from ereuse_devicehub.resources.tradedocument import schemas as s_document
from ereuse_devicehub.resources.documents import schemas as s_generic_document
from ereuse_devicehub.resources.enums import AppearanceRange, BiosAccessRange, FunctionalityRange, \
PhysicalErasureMethod, R_POSITIVE, RatingRange, \
Severity, SnapshotSoftware, TestDataStorageLength
from ereuse_devicehub.resources.enums import (
R_POSITIVE,
AppearanceRange,
BiosAccessRange,
FunctionalityRange,
PhysicalErasureMethod,
RatingRange,
Severity,
SnapshotSoftware,
TestDataStorageLength,
)
from ereuse_devicehub.resources.models import STR_BIG_SIZE, STR_SIZE
from ereuse_devicehub.resources.schemas import Thing
from ereuse_devicehub.resources.tradedocument import schemas as s_document
from ereuse_devicehub.resources.tradedocument.models import TradeDocument
from ereuse_devicehub.resources.user import schemas as s_user
from ereuse_devicehub.resources.user.models import User
from ereuse_devicehub.resources.tradedocument.models import TradeDocument
class Action(Thing):
__doc__ = m.Action.__doc__
id = UUID(dump_only=True)
name = SanitizedStr(default='',
validate=Length(max=STR_BIG_SIZE),
description=m.Action.name.comment)
name = SanitizedStr(
default='', validate=Length(max=STR_BIG_SIZE), description=m.Action.name.comment
)
closed = Boolean(missing=True, description=m.Action.closed.comment)
severity = EnumField(Severity, description=m.Action.severity.comment)
description = SanitizedStr(default='', description=m.Action.description.comment)
@ -43,16 +65,18 @@ class Action(Thing):
agent = NestedOn(s_agent.Agent, description=m.Action.agent_id.comment)
author = NestedOn(s_user.User, dump_only=True, exclude=('token',))
components = NestedOn(s_device.Component, dump_only=True, many=True)
parent = NestedOn(s_device.Computer, dump_only=True, description=m.Action.parent_id.comment)
parent = NestedOn(
s_device.Computer, dump_only=True, description=m.Action.parent_id.comment
)
url = URL(dump_only=True, description=m.Action.url.__doc__)
@validates_schema
def validate_times(self, data: dict):
unix_time = datetime.fromisoformat("1970-01-02 00:00:00+00:00")
if 'end_time' in data and data['end_time'] < unix_time:
if 'end_time' in data and data['end_time'].replace(tzinfo=tzutc()) < unix_time:
data['end_time'] = unix_time
if 'start_time' in data and data['start_time'] < unix_time:
if 'start_time' in data and data['start_time'].replace(tzinfo=tzutc()) < unix_time:
data['start_time'] = unix_time
if data.get('end_time') and data.get('start_time'):
@ -67,24 +91,27 @@ class ActionWithOneDevice(Action):
class ActionWithMultipleDocuments(Action):
__doc__ = m.ActionWithMultipleTradeDocuments.__doc__
documents = NestedOn(s_document.TradeDocument,
documents = NestedOn(
s_document.TradeDocument,
many=True,
required=True, # todo test ensuring len(devices) >= 1
only_query='id',
collection_class=OrderedSet)
collection_class=OrderedSet,
)
class ActionWithMultipleDevices(Action):
__doc__ = m.ActionWithMultipleDevices.__doc__
devices = NestedOn(s_device.Device,
devices = NestedOn(
s_device.Device,
many=True,
required=True, # todo test ensuring len(devices) >= 1
only_query='id',
collection_class=OrderedSet)
collection_class=OrderedSet,
)
class ActionWithMultipleDevicesCheckingOwner(ActionWithMultipleDevices):
@post_load
def check_owner_of_device(self, data):
for dev in data['devices']:
@ -102,20 +129,29 @@ class Remove(ActionWithOneDevice):
class Allocate(ActionWithMultipleDevicesCheckingOwner):
__doc__ = m.Allocate.__doc__
start_time = DateTime(data_key='startTime', required=True,
description=m.Action.start_time.comment)
end_time = DateTime(data_key='endTime', required=False,
description=m.Action.end_time.comment)
final_user_code = SanitizedStr(data_key="finalUserCode",
start_time = DateTime(
data_key='startTime', required=True, description=m.Action.start_time.comment
)
end_time = DateTime(
data_key='endTime', required=False, description=m.Action.end_time.comment
)
final_user_code = SanitizedStr(
data_key="finalUserCode",
validate=Length(min=1, max=STR_BIG_SIZE),
required=False,
description='This is a internal code for mainteing the secrets of the \
personal datas of the new holder')
transaction = SanitizedStr(validate=Length(min=1, max=STR_BIG_SIZE),
personal datas of the new holder',
)
transaction = SanitizedStr(
validate=Length(min=1, max=STR_BIG_SIZE),
required=False,
description='The code used from the owner for \
relation with external tool.')
end_users = Integer(data_key='endUsers', validate=[Range(min=1, error="Value must be greater than 0")])
relation with external tool.',
)
end_users = Integer(
data_key='endUsers',
validate=[Range(min=1, error="Value must be greater than 0")],
)
@validates_schema
def validate_allocate(self, data: dict):
@ -136,12 +172,15 @@ class Allocate(ActionWithMultipleDevicesCheckingOwner):
class Deallocate(ActionWithMultipleDevicesCheckingOwner):
__doc__ = m.Deallocate.__doc__
start_time = DateTime(data_key='startTime', required=True,
description=m.Action.start_time.comment)
transaction = SanitizedStr(validate=Length(min=1, max=STR_BIG_SIZE),
start_time = DateTime(
data_key='startTime', required=True, description=m.Action.start_time.comment
)
transaction = SanitizedStr(
validate=Length(min=1, max=STR_BIG_SIZE),
required=False,
description='The code used from the owner for \
relation with external tool.')
relation with external tool.',
)
@validates_schema
def validate_deallocate(self, data: dict):
@ -232,7 +271,9 @@ class MeasureBattery(Test):
__doc__ = m.MeasureBattery.__doc__
size = Integer(required=True, description=m.MeasureBattery.size.comment)
voltage = Integer(required=True, description=m.MeasureBattery.voltage.comment)
cycle_count = Integer(data_key='cycleCount', description=m.MeasureBattery.cycle_count.comment)
cycle_count = Integer(
data_key='cycleCount', description=m.MeasureBattery.cycle_count.comment
)
health = EnumField(enums.BatteryHealth, description=m.MeasureBattery.health.comment)
@ -289,28 +330,32 @@ class TestBios(Test):
class VisualTest(Test):
__doc__ = m.VisualTest.__doc__
appearance_range = EnumField(AppearanceRange, data_key='appearanceRange')
functionality_range = EnumField(FunctionalityRange,
data_key='functionalityRange')
functionality_range = EnumField(FunctionalityRange, data_key='functionalityRange')
labelling = Boolean()
class Rate(ActionWithOneDevice):
__doc__ = m.Rate.__doc__
rating = Integer(validate=Range(*R_POSITIVE),
rating = Integer(
validate=Range(*R_POSITIVE), dump_only=True, description=m.Rate._rating.comment
)
version = Version(dump_only=True, description=m.Rate.version.comment)
appearance = Integer(
validate=Range(enums.R_NEGATIVE),
dump_only=True,
description=m.Rate._rating.comment)
version = Version(dump_only=True,
description=m.Rate.version.comment)
appearance = Integer(validate=Range(enums.R_NEGATIVE),
description=m.Rate._appearance.comment,
)
functionality = Integer(
validate=Range(enums.R_NEGATIVE),
dump_only=True,
description=m.Rate._appearance.comment)
functionality = Integer(validate=Range(enums.R_NEGATIVE),
dump_only=True,
description=m.Rate._functionality.comment)
rating_range = EnumField(RatingRange,
description=m.Rate._functionality.comment,
)
rating_range = EnumField(
RatingRange,
dump_only=True,
data_key='ratingRange',
description=m.Rate.rating_range.__doc__)
description=m.Rate.rating_range.__doc__,
)
class RateComputer(Rate):
@ -320,19 +365,25 @@ class RateComputer(Rate):
data_storage = Float(dump_only=True, data_key='dataStorage')
graphic_card = Float(dump_only=True, data_key='graphicCard')
data_storage_range = EnumField(RatingRange, dump_only=True, data_key='dataStorageRange')
data_storage_range = EnumField(
RatingRange, dump_only=True, data_key='dataStorageRange'
)
ram_range = EnumField(RatingRange, dump_only=True, data_key='ramRange')
processor_range = EnumField(RatingRange, dump_only=True, data_key='processorRange')
graphic_card_range = EnumField(RatingRange, dump_only=True, data_key='graphicCardRange')
graphic_card_range = EnumField(
RatingRange, dump_only=True, data_key='graphicCardRange'
)
class Price(ActionWithOneDevice):
__doc__ = m.Price.__doc__
currency = EnumField(Currency, required=True, description=m.Price.currency.comment)
price = Decimal(places=m.Price.SCALE,
price = Decimal(
places=m.Price.SCALE,
rounding=m.Price.ROUND,
required=True,
description=m.Price.price.comment)
description=m.Price.price.comment,
)
version = Version(dump_only=True, description=m.Price.version.comment)
rating = NestedOn(Rate, dump_only=True, description=m.Price.rating_id.comment)
@ -356,9 +407,11 @@ class EreusePrice(Price):
class Install(ActionWithOneDevice):
__doc__ = m.Install.__doc__
name = SanitizedStr(validate=Length(min=4, max=STR_BIG_SIZE),
name = SanitizedStr(
validate=Length(min=4, max=STR_BIG_SIZE),
required=True,
description='The name of the OS installed.')
description='The name of the OS installed.',
)
elapsed = TimeDelta(precision=TimeDelta.SECONDS, required=True)
address = Integer(validate=OneOf({8, 16, 32, 64, 128, 256}))
@ -372,18 +425,23 @@ class Snapshot(ActionWithOneDevice):
See docs for more info.
"""
uuid = UUID()
software = EnumField(SnapshotSoftware,
wbid = String(required=False)
software = EnumField(
SnapshotSoftware,
required=True,
description='The software that generated this Snapshot.')
description='The software that generated this Snapshot.',
)
version = Version(required=True, description='The version of the software.')
actions = NestedOn(Action, many=True, dump_only=True)
elapsed = TimeDelta(precision=TimeDelta.SECONDS)
components = NestedOn(s_device.Component,
components = NestedOn(
s_device.Component,
many=True,
description='A list of components that are inside of the device'
'at the moment of this Snapshot.'
'Order is preserved, so the component num 0 when'
'submitting is the component num 0 when returning it back.')
'submitting is the component num 0 when returning it back.',
)
@validates_schema
def validate_workbench_version(self, data: dict):
@ -391,16 +449,21 @@ class Snapshot(ActionWithOneDevice):
if data['version'] < app.config['MIN_WORKBENCH']:
raise ValidationError(
'Min. supported Workbench version is '
'{} but yours is {}.'.format(app.config['MIN_WORKBENCH'], data['version']),
field_names=['version']
'{} but yours is {}.'.format(
app.config['MIN_WORKBENCH'], data['version']
),
field_names=['version'],
)
@validates_schema
def validate_components_only_workbench(self, data: dict):
if (data['software'] != SnapshotSoftware.Workbench) and (data['software'] != SnapshotSoftware.WorkbenchAndroid):
if (data['software'] != SnapshotSoftware.Workbench) and (
data['software'] != SnapshotSoftware.WorkbenchAndroid
):
if data.get('components', None) is not None:
raise ValidationError('Only Workbench can add component info',
field_names=['components'])
raise ValidationError(
'Only Workbench can add component info', field_names=['components']
)
@validates_schema
def validate_only_workbench_fields(self, data: dict):
@ -408,22 +471,32 @@ class Snapshot(ActionWithOneDevice):
# todo test
if data['software'] == SnapshotSoftware.Workbench:
if not data.get('uuid', None):
raise ValidationError('Snapshots from Workbench and WorkbenchAndroid must have uuid',
field_names=['uuid'])
raise ValidationError(
'Snapshots from Workbench and WorkbenchAndroid must have uuid',
field_names=['uuid'],
)
if data.get('elapsed', None) is None:
raise ValidationError('Snapshots from Workbench must have elapsed',
field_names=['elapsed'])
raise ValidationError(
'Snapshots from Workbench must have elapsed',
field_names=['elapsed'],
)
elif data['software'] == SnapshotSoftware.WorkbenchAndroid:
if not data.get('uuid', None):
raise ValidationError('Snapshots from Workbench and WorkbenchAndroid must have uuid',
field_names=['uuid'])
raise ValidationError(
'Snapshots from Workbench and WorkbenchAndroid must have uuid',
field_names=['uuid'],
)
else:
if data.get('uuid', None):
raise ValidationError('Only Snapshots from Workbench or WorkbenchAndroid can have uuid',
field_names=['uuid'])
raise ValidationError(
'Only Snapshots from Workbench or WorkbenchAndroid can have uuid',
field_names=['uuid'],
)
if data.get('elapsed', None):
raise ValidationError('Only Snapshots from Workbench can have elapsed',
field_names=['elapsed'])
raise ValidationError(
'Only Snapshots from Workbench can have elapsed',
field_names=['elapsed'],
)
class ToRepair(ActionWithMultipleDevicesCheckingOwner):
@ -440,16 +513,20 @@ class Ready(ActionWithMultipleDevicesCheckingOwner):
class ActionStatus(Action):
rol_user = NestedOn(s_user.User, dump_only=True, exclude=('token',))
devices = NestedOn(s_device.Device,
devices = NestedOn(
s_device.Device,
many=True,
required=False, # todo test ensuring len(devices) >= 1
only_query='id',
collection_class=OrderedSet)
documents = NestedOn(s_document.TradeDocument,
collection_class=OrderedSet,
)
documents = NestedOn(
s_document.TradeDocument,
many=True,
required=False, # todo test ensuring len(devices) >= 1
only_query='id',
collection_class=OrderedSet)
collection_class=OrderedSet,
)
@pre_load
def put_devices(self, data: dict):
@ -508,20 +585,28 @@ class Live(ActionWithOneDevice):
See docs for more info.
"""
uuid = UUID()
software = EnumField(SnapshotSoftware,
software = EnumField(
SnapshotSoftware,
required=True,
description='The software that generated this Snapshot.')
description='The software that generated this Snapshot.',
)
version = Version(required=True, description='The version of the software.')
final_user_code = SanitizedStr(data_key="finalUserCode", dump_only=True)
licence_version = Version(required=True, description='The version of the software.')
components = NestedOn(s_device.Component,
components = NestedOn(
s_device.Component,
many=True,
description='A list of components that are inside of the device'
'at the moment of this Snapshot.'
'Order is preserved, so the component num 0 when'
'submitting is the component num 0 when returning it back.')
usage_time_allocate = TimeDelta(data_key='usageTimeAllocate', required=False,
precision=TimeDelta.HOURS, dump_only=True)
'submitting is the component num 0 when returning it back.',
)
usage_time_allocate = TimeDelta(
data_key='usageTimeAllocate',
required=False,
precision=TimeDelta.HOURS,
dump_only=True,
)
class Organize(ActionWithMultipleDevices):
@ -691,26 +776,23 @@ class Trade(ActionWithMultipleDevices):
validate=Length(max=STR_SIZE),
data_key='userToEmail',
missing='',
required=False
required=False,
)
user_to = NestedOn(s_user.User, dump_only=True, data_key='userTo')
user_from_email = SanitizedStr(
validate=Length(max=STR_SIZE),
data_key='userFromEmail',
missing='',
required=False
required=False,
)
user_from = NestedOn(s_user.User, dump_only=True, data_key='userFrom')
code = SanitizedStr(validate=Length(max=STR_SIZE), data_key='code', required=False)
confirm = Boolean(
data_key='confirms',
missing=True,
description="""If you need confirmation of the user you need actevate this field"""
description="""If you need confirmation of the user you need actevate this field""",
)
lot = NestedOn('Lot',
many=False,
required=True,
only_query='id')
lot = NestedOn('Lot', many=False, required=True, only_query='id')
@pre_load
def adding_devices(self, data: dict):

View File

@ -1,18 +1,21 @@
""" This is the view for Snapshots """
import os
import json
import os
import shutil
from datetime import datetime
from flask import current_app as app, g
from flask import current_app as app
from flask import g
from marshmallow import ValidationError
from sqlalchemy.util import OrderedSet
from ereuse_devicehub.db import db
from ereuse_devicehub.resources.action.models import RateComputer, Snapshot
from ereuse_devicehub.parser.models import SnapshotErrors
from ereuse_devicehub.resources.action.models import Snapshot
from ereuse_devicehub.resources.device.models import Computer
from ereuse_devicehub.resources.action.rate.v1_0 import CannotRate
from ereuse_devicehub.resources.enums import SnapshotSoftware, Severity
from ereuse_devicehub.resources.device.sync import Sync
from ereuse_devicehub.resources.enums import Severity, SnapshotSoftware
from ereuse_devicehub.resources.user.exceptions import InsufficientPermission
@ -59,48 +62,35 @@ def move_json(tmp_snapshots, path_name, user, live=False):
os.remove(path_name)
class SnapshotView():
"""Performs a Snapshot.
class SnapshotMix:
sync = Sync()
See `Snapshot` section in docs for more info.
"""
# Note that if we set the device / components into the snapshot
# model object, when we flush them to the db we will flush
# snapshot, and we want to wait to flush snapshot at the end
def __init__(self, snapshot_json: dict, resource_def, schema):
self.schema = schema
self.resource_def = resource_def
self.tmp_snapshots = app.config['TMP_SNAPSHOTS']
self.path_snapshot = save_json(snapshot_json, self.tmp_snapshots, g.user.email)
snapshot_json.pop('debug', None)
self.snapshot_json = resource_def.schema.load(snapshot_json)
self.response = self.build()
move_json(self.tmp_snapshots, self.path_snapshot, g.user.email)
def post(self):
return self.response
def build(self):
device = self.snapshot_json.pop('device') # type: Computer
def build(self, snapshot_json=None): # noqa: C901
if not snapshot_json:
snapshot_json = self.snapshot_json
device = snapshot_json.pop('device') # type: Computer
components = None
if self.snapshot_json['software'] == (SnapshotSoftware.Workbench or SnapshotSoftware.WorkbenchAndroid):
components = self.snapshot_json.pop('components', None) # type: List[Component]
if snapshot_json['software'] == (
SnapshotSoftware.Workbench or SnapshotSoftware.WorkbenchAndroid
):
components = snapshot_json.pop('components', None) # type: List[Component]
if isinstance(device, Computer) and device.hid:
device.add_mac_to_hid(components_snap=components)
snapshot = Snapshot(**self.snapshot_json)
snapshot = Snapshot(**snapshot_json)
# Remove new actions from devices so they don't interfere with sync
actions_device = set(e for e in device.actions_one)
device.actions_one.clear()
if components:
actions_components = tuple(set(e for e in c.actions_one) for c in components)
actions_components = tuple(
set(e for e in c.actions_one) for c in components
)
for component in components:
component.actions_one.clear()
assert not device.actions_one
assert all(not c.actions_one for c in components) if components else True
db_device, remove_actions = self.resource_def.sync.run(device, components)
db_device, remove_actions = self.sync.run(device, components)
del device # Do not use device anymore
snapshot.device = db_device
@ -120,24 +110,49 @@ class SnapshotView():
# Check ownership of (non-component) device to from current.user
if db_device.owner_id != g.user.id:
raise InsufficientPermission()
# Compute ratings
try:
rate_computer, price = RateComputer.compute(db_device)
except CannotRate:
pass
else:
snapshot.actions.add(rate_computer)
if price:
snapshot.actions.add(price)
elif snapshot.software == SnapshotSoftware.WorkbenchAndroid:
pass # TODO try except to compute RateMobile
# Check if HID is null and add Severity:Warning to Snapshot
if snapshot.device.hid is None:
snapshot.severity = Severity.Warning
return snapshot
class SnapshotView(SnapshotMix):
"""Performs a Snapshot.
See `Snapshot` section in docs for more info.
"""
# Note that if we set the device / components into the snapshot
# model object, when we flush them to the db we will flush
# snapshot, and we want to wait to flush snapshot at the end
def __init__(self, snapshot_json: dict, resource_def, schema):
self.schema = schema
self.resource_def = resource_def
self.tmp_snapshots = app.config['TMP_SNAPSHOTS']
self.path_snapshot = save_json(snapshot_json, self.tmp_snapshots, g.user.email)
snapshot_json.pop('debug', None)
try:
self.snapshot_json = resource_def.schema.load(snapshot_json)
except ValidationError as err:
txt = "{}".format(err)
uuid = snapshot_json.get('uuid')
error = SnapshotErrors(
description=txt, snapshot_uuid=uuid, severity=Severity.Error
)
error.save(commit=True)
raise err
snapshot = self.build()
db.session.add(snapshot)
db.session().final_flush()
ret = self.schema.jsonify(snapshot) # transform it back
ret.status_code = 201
self.response = self.schema.jsonify(snapshot) # transform it back
self.response.status_code = 201
db.session.commit()
return ret
move_json(self.tmp_snapshots, self.path_snapshot, g.user.email)
def post(self):
return self.response

View File

@ -1,35 +1,49 @@
""" This is the view for Snapshots """
import jwt
import ereuse_utils
from datetime import timedelta
from distutils.version import StrictVersion
from uuid import UUID
from flask import current_app as app, request, g
import ereuse_utils
import jwt
from flask import current_app as app
from flask import g, request
from teal.db import ResourceNotFound
from teal.marshmallow import ValidationError
from teal.resource import View
from ereuse_devicehub.db import db
from ereuse_devicehub.query import things_response
from ereuse_devicehub.resources.action.models import (Action, Snapshot, VisualTest,
InitTransfer, Live, Allocate, Deallocate,
Trade, Confirm, Revoke)
from ereuse_devicehub.resources.action.models import (
Action,
Allocate,
Confirm,
Deallocate,
InitTransfer,
Live,
Revoke,
Snapshot,
Trade,
VisualTest,
)
from ereuse_devicehub.resources.action.views import trade as trade_view
from ereuse_devicehub.resources.action.views.snapshot import SnapshotView, save_json, move_json
from ereuse_devicehub.resources.action.views.documents import ErasedView
from ereuse_devicehub.resources.device.models import Device, Computer, DataStorage
from ereuse_devicehub.resources.action.views.snapshot import (
SnapshotView,
move_json,
save_json,
)
from ereuse_devicehub.resources.device.models import Computer, DataStorage, Device
from ereuse_devicehub.resources.enums import Severity
SUPPORTED_WORKBENCH = StrictVersion('11.0')
class AllocateMix():
class AllocateMix:
model = None
def post(self):
""" Create one res_obj """
"""Create one res_obj"""
res_json = request.get_json()
res_obj = self.model(**res_json)
db.session.add(res_obj)
@ -40,13 +54,18 @@ class AllocateMix():
return ret
def find(self, args: dict):
res_objs = self.model.query.filter_by(author=g.user) \
.order_by(self.model.created.desc()) \
res_objs = (
self.model.query.filter_by(author=g.user)
.order_by(self.model.created.desc())
.paginate(per_page=200)
)
return things_response(
self.schema.dump(res_objs.items, many=True, nested=0),
res_objs.page, res_objs.per_page, res_objs.total,
res_objs.prev_num, res_objs.next_num
res_objs.page,
res_objs.per_page,
res_objs.total,
res_objs.prev_num,
res_objs.next_num,
)
@ -99,7 +118,9 @@ class LiveView(View):
if not serial_number:
"""There aren't any disk"""
raise ResourceNotFound("There aren't any disk in this device {}".format(device))
raise ResourceNotFound(
"There aren't any disk in this device {}".format(device)
)
return usage_time_hdd, serial_number
def get_hid(self, snapshot):
@ -109,8 +130,11 @@ class LiveView(View):
return None
if not components:
return device.hid
macs = [c.serial_number for c in components
if c.type == 'NetworkAdapter' and c.serial_number is not None]
macs = [
c.serial_number
for c in components
if c.type == 'NetworkAdapter' and c.serial_number is not None
]
macs.sort()
mac = ''
hid = device.hid
@ -124,12 +148,10 @@ class LiveView(View):
def live(self, snapshot):
"""If the device.allocated == True, then this snapshot create an action live."""
hid = self.get_hid(snapshot)
if not hid or not Device.query.filter(
Device.hid == hid).count():
if not hid or not Device.query.filter(Device.hid == hid).count():
raise ValidationError('Device not exist.')
device = Device.query.filter(
Device.hid == hid, Device.allocated == True).one()
device = Device.query.filter(Device.hid == hid, Device.allocated == True).one()
# Is not necessary
if not device:
raise ValidationError('Device not exist.')
@ -138,7 +160,8 @@ class LiveView(View):
usage_time_hdd, serial_number = self.get_hdd_details(snapshot, device)
data_live = {'usage_time_hdd': usage_time_hdd,
data_live = {
'usage_time_hdd': usage_time_hdd,
'serial_number': serial_number,
'snapshot_uuid': snapshot['uuid'],
'description': '',
@ -147,7 +170,8 @@ class LiveView(View):
'licence_version': snapshot['licence_version'],
'author_id': device.owner_id,
'agent_id': device.owner.individual.id,
'device': device}
'device': device,
}
live = Live(**data_live)
@ -172,7 +196,12 @@ class LiveView(View):
def decode_snapshot(data):
try:
return jwt.decode(data['data'], app.config['JWT_PASS'], algorithms="HS256", json_encoder=ereuse_utils.JSONEncoder)
return jwt.decode(
data['data'],
app.config['JWT_PASS'],
algorithms="HS256",
json_encoder=ereuse_utils.JSONEncoder,
)
except jwt.exceptions.InvalidSignatureError as err:
txt = 'Invalid snapshot'
raise ValidationError(txt)
@ -206,7 +235,7 @@ class ActionView(View):
# snapshot_data = decode_snapshot(json)
snapshot_data = json
if 'data' in json:
if 'data' in json and isinstance(json['data'], str):
snapshot_data = decode_snapshot(json)
if not snapshot_data:
@ -248,7 +277,9 @@ class ActionView(View):
return confirm.post()
if json['type'] == 'ConfirmRevokeDocument':
confirm_revoke = trade_view.ConfirmRevokeDocumentView(json, resource_def, self.schema)
confirm_revoke = trade_view.ConfirmRevokeDocumentView(
json, resource_def, self.schema
)
return confirm_revoke.post()
if json['type'] == 'DataWipe':

View File

@ -663,6 +663,7 @@ class Computer(Device):
db.ForeignKey(User.id),
nullable=True)
receiver = db.relationship(User, primaryjoin=receiver_id == User.id)
uuid = db.Column(UUID(as_uuid=True), nullable=True)
def __init__(self, *args, **kwargs) -> None:
if args:

View File

@ -136,6 +136,7 @@ class Computer(Device):
owner_id = UUID(data_key='ownerID')
transfer_state = EnumField(enums.TransferState, description=m.Computer.transfer_state.comment)
receiver_id = UUID(data_key='receiverID')
uuid = UUID(required=False)
class Desktop(Computer):

View File

@ -8,6 +8,7 @@ import inflection
@unique
class SnapshotSoftware(Enum):
"""The software used to perform the Snapshot."""
Workbench = 'Workbench'
WorkbenchAndroid = 'WorkbenchAndroid'
AndroidApp = 'AndroidApp'
@ -36,6 +37,7 @@ class RatingRange(IntEnum):
3. Medium.
4. High.
"""
VERY_LOW = 1
LOW = 2
MEDIUM = 3
@ -69,6 +71,7 @@ class PriceSoftware(Enum):
@unique
class AppearanceRange(Enum):
"""Grades the imperfections that aesthetically affect the device, but not its usage."""
Z = 'Z. The device is new'
A = 'A. Is like new; without visual damage'
B = 'B. Is in really good condition; small visual damage in difficult places to spot'
@ -83,6 +86,7 @@ class AppearanceRange(Enum):
@unique
class FunctionalityRange(Enum):
"""Grades the defects of a device that affect its usage."""
A = 'A. All the buttons works perfectly, no screen/camera defects and chassis without usage issues'
B = 'B. There is a button difficult to press or unstable it, a screen/camera defect or chassis problem'
C = 'C. Chassis defects or multiple buttons don\'t work; broken or unusable it, some screen/camera defect'
@ -95,6 +99,7 @@ class FunctionalityRange(Enum):
@unique
class BatteryHealthRange(Enum):
"""Grade the battery health status, depending on self report Android system"""
A = 'A. The battery health is very good'
B = 'B. Battery health is good'
C = 'C. Battery health is overheat / over voltage status but can stand the minimum duration'
@ -109,6 +114,7 @@ class BatteryHealthRange(Enum):
@unique
class BiosAccessRange(Enum):
"""How difficult it has been to set the bios to boot from the network."""
A = 'A. If by pressing a key you could access a boot menu with the network boot'
B = 'B. You had to get into the BIOS, and in less than 5 steps you could set the network boot'
C = 'C. Like B, but with more than 5 steps'
@ -139,6 +145,7 @@ class ImageSoftware(Enum):
@unique
class ImageMimeTypes(Enum):
"""Supported image Mimetypes for Devicehub."""
jpg = 'image/jpeg'
png = 'image/png'
@ -149,6 +156,7 @@ BOX_RATE_3 = 1, 3
# After looking at own databases
@unique
class RamInterface(Enum):
"""
@ -163,6 +171,7 @@ class RamInterface(Enum):
here for those cases where there is no more specific information.
Please, try to always use DDRø-6 denominations.
"""
SDRAM = 'SDRAM'
DDR = 'DDR SDRAM'
DDR2 = 'DDR2 SDRAM'
@ -170,6 +179,7 @@ class RamInterface(Enum):
DDR4 = 'DDR4 SDRAM'
DDR5 = 'DDR5 SDRAM'
DDR6 = 'DDR6 SDRAM'
LPDDR3 = 'LPDDR3'
def __str__(self):
return self.value
@ -189,6 +199,7 @@ class DataStorageInterface(Enum):
ATA = 'ATA'
USB = 'USB'
PCI = 'PCI'
NVME = 'NVME'
def __str__(self):
return self.value
@ -211,6 +222,7 @@ class DisplayTech(Enum):
@unique
class ComputerChassis(Enum):
"""The chassis of a computer."""
Tower = 'Tower'
Docking = 'Docking'
AllInOne = 'All in one'
@ -235,6 +247,7 @@ class ReceiverRole(Enum):
The role that the receiver takes in the reception;
the meaning of the reception.
"""
Intermediary = 'Generic user in the workflow of the device.'
FinalUser = 'The user that will use the device.'
CollectionPoint = 'A collection point.'
@ -244,6 +257,7 @@ class ReceiverRole(Enum):
class PrinterTechnology(Enum):
"""Technology of the printer."""
Toner = 'Toner / Laser'
Inkjet = 'Liquid inkjet'
SolidInk = 'Solid ink'
@ -260,6 +274,7 @@ class CameraFacing(Enum):
@unique
class BatteryHealth(Enum):
"""The battery health status as in Android."""
Cold = 'Cold'
Dead = 'Dead'
Good = 'Good'
@ -274,6 +289,7 @@ class BatteryTechnology(Enum):
https://www.kernel.org/doc/Documentation/ABI/testing/sysfs-class-power
adding ``Alkaline``.
"""
LiIon = 'Lithium-ion'
NiCd = 'Nickel-Cadmium'
NiMH = 'Nickel-metal hydride'
@ -329,10 +345,11 @@ class PhysicalErasureMethod(Enum):
and non able to be re-built.
"""
Shred = 'Reduction of the data-storage to the required certified ' \
'standard sizes.'
Disintegration = 'Reduction of the data-storage to smaller sizes ' \
Shred = 'Reduction of the data-storage to the required certified ' 'standard sizes.'
Disintegration = (
'Reduction of the data-storage to smaller sizes '
'than the certified standard ones.'
)
def __str__(self):
return self.name
@ -362,20 +379,21 @@ class ErasureStandards(Enum):
def from_data_storage(cls, erasure) -> Set['ErasureStandards']:
"""Returns a set of erasure standards."""
from ereuse_devicehub.resources.action import models as actions
standards = set()
if isinstance(erasure, actions.EraseSectors):
with suppress(ValueError):
first_step, *other_steps = erasure.steps
if isinstance(first_step, actions.StepZero) \
and all(isinstance(step, actions.StepRandom) for step in other_steps):
if isinstance(first_step, actions.StepZero) and all(
isinstance(step, actions.StepRandom) for step in other_steps
):
standards.add(cls.HMG_IS5)
return standards
@unique
class TransferState(IntEnum):
"""State of transfer for a given Lot of devices.
"""
"""State of transfer for a given Lot of devices."""
"""
* Initial: No transfer action in place.

View File

@ -5,6 +5,7 @@ Use this as a starting point.
"""
from flask_wtf.csrf import CSRFProtect
from ereuse_devicehub.api.views import api
from ereuse_devicehub.config import DevicehubConfig
from ereuse_devicehub.devicehub import Devicehub
from ereuse_devicehub.inventory.views import devices
@ -15,6 +16,7 @@ app = Devicehub(inventory=DevicehubConfig.DB_SCHEMA)
app.register_blueprint(core)
app.register_blueprint(devices)
app.register_blueprint(labels)
app.register_blueprint(api)
# configure & enable CSRF of Flask-WTF
# NOTE: enable by blueprint to exclude API views

View File

@ -33,7 +33,7 @@ SQLAlchemy==1.3.24
SQLAlchemy-Utils==0.33.11
teal==0.2.0a38
webargs==5.5.3
Werkzeug==0.15.3
Werkzeug==0.15.5
sqlalchemy-citext==1.3.post0
flask-weasyprint==0.5
weasyprint==44
@ -43,3 +43,5 @@ tqdm==4.32.2
python-decouple==3.3
python-dotenv==0.14.0
pyjwt==2.0.0a1
pint==0.9
py-dmidecode==0.1.0

View File

@ -1,4 +1,5 @@
import io
import json
import uuid
import jwt
import ereuse_utils
@ -22,6 +23,7 @@ from ereuse_devicehub.resources.tag import Tag
from ereuse_devicehub.resources.user.models import User
from ereuse_devicehub.resources.user.models import Session
from ereuse_devicehub.resources.enums import SessionType
from ereuse_devicehub.api.views import api
STARTT = datetime(year=2000, month=1, day=1, hour=1)
"""A dummy starting time to use in tests."""
@ -68,6 +70,7 @@ def app(request, _app: Devicehub) -> Devicehub:
tag_token=uuid.UUID('52dacef0-6bcb-4919-bfed-f10d2c96ecee'),
erase=False,
common=True)
_app.register_blueprint(api)
with _app.app_context():
try:
@ -166,6 +169,11 @@ def file(name: str) -> dict:
return json_encode(yaml2json(name))
def file_json(name):
with Path(__file__).parent.joinpath('files').joinpath(name).open() as f:
return json.loads(f.read())
def file_workbench(name: str) -> dict:
"""Opens and parses a YAML file from the ``files`` subdir."""
with Path(__file__).parent.joinpath('workbench_files').joinpath(name + '.json').open() as f:

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -1,2 +1,2 @@
Type,Chassis,Serial Number,Model,Manufacturer,Registered in,Physical state,Trading state,Price,Processor,RAM (MB),Data Storage Size (MB),Rate,Range,Processor Rate,Processor Range,RAM Rate,RAM Range,Data Storage Rate,Data Storage Range
Desktop,Microtower,d1s,d1ml,d1mr,Tue Jul 2 10:35:10 2019,,,,p1ml,0,0,1.0,Very low,1.0,Very low,1.0,Very low,1.0,Very low
Type;Chassis;Serial Number;Model;Manufacturer;Registered in;Physical state;Trading state;Price;Processor;RAM (MB);Data Storage Size (MB)
Desktop;Microtower;d1s;d1ml;d1mr;Tue Mar 29 18:13:05 2022;;;;p1ml;0;0

1 Type Chassis Serial Number Model Manufacturer Registered in Physical state Trading state Price Processor RAM (MB) Data Storage Size (MB) Rate Range Processor Rate Processor Range RAM Rate RAM Range Data Storage Rate Data Storage Range
2 Desktop Microtower d1s d1ml d1mr Tue Jul 2 10:35:10 2019 Tue Mar 29 18:13:05 2022 p1ml 0 0 1.0 Very low 1.0 Very low 1.0 Very low 1.0 Very low

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -0,0 +1 @@
{"device": {"dataStorageSize": 99, "serialNumber": "02:00:00:00:00:00", "model": "Motorola One Vision", "type": "Mobile", "ramSize": 31138, "displaySize": 9, "manufacturer": "Motorola"}, "software": "WorkbenchAndroid", "type": "Snapshot", "uuid": "958d697f-af34-4410-85d6-adb906d46161", "version": "0.0.2"}

View File

@ -30,6 +30,7 @@ def test_api_docs(client: Client):
assert set(docs['paths'].keys()) == {
'/actions/',
'/apidocs',
'/api/inventory/',
'/allocates/',
'/deallocates/',
'/deliverynotes/',

View File

@ -130,6 +130,7 @@ def test_physical_properties():
'model': 'foo',
'receiver_id': None,
'serial_number': 'foo-bar',
'uuid': None,
'transfer_state': TransferState.Initial
}
@ -480,7 +481,7 @@ def test_get_device_permissions(app: Devicehub, user: UserClient, user2: UserCli
s, _ = user.post(file('asus-eee-1000h.snapshot.11'), res=m.Snapshot)
pc, res = user.get(res=d.Device, item=s['device']['devicehubID'])
assert res.status_code == 200
assert len(pc['actions']) == 9
assert len(pc['actions']) == 7
html, _ = client.get(res=d.Device, item=s['device']['devicehubID'], accept=ANY)
assert 'intel atom cpu n270 @ 1.60ghz' in html

View File

@ -181,7 +181,7 @@ def test_device_query(user: UserClient):
assert i['url'] == '/devices/'
assert i['items'][0]['url'] == '/devices/%s' % snapshot['device']['devicehubID']
pc = next(d for d in i['items'] if d['type'] == 'Desktop')
assert len(pc['actions']) == 4
assert len(pc['actions']) == 3
assert len(pc['components']) == 3
assert pc['tags'][0]['id'] == pc['devicehubID']

View File

@ -337,6 +337,7 @@ def test_export_computer_monitor(user: UserClient):
f = StringIO(csv_str)
obj_csv = csv.reader(f, f)
export_csv = list(obj_csv)
# Open fixture csv and transform to list
with Path(__file__).parent.joinpath('files').joinpath('computer-monitor.csv').open() \
as csv_file:
@ -435,12 +436,14 @@ def test_report_devices_stock_control(user: UserClient, user2: UserClient):
'Register in field is not a datetime'
# Pop dates fields from csv lists to compare them
fixture_csv[1] = fixture_csv[1][0].split(";")
fixture_csv[1] = fixture_csv[1][:5] + fixture_csv[1][6:]
export_csv[1] = export_csv[1][:5] + export_csv[1][6:]
assert fixture_csv[0] == export_csv[0], 'Headers are not equal'
export_header = [";".join(export_csv[0])]
assert fixture_csv[0] == export_header, 'Headers are not equal'
assert fixture_csv[1] == export_csv[1], 'Computer information are not equal'
assert fixture_csv == export_csv
assert fixture_csv == [export_header, export_csv[1]]
@pytest.mark.mvp

View File

@ -1,38 +1,49 @@
import os
import json
import os
import shutil
import pytest
import uuid
from datetime import datetime, timedelta, timezone
from requests.exceptions import HTTPError
from operator import itemgetter
from pathlib import Path
from typing import List, Tuple
from uuid import uuid4
import pytest
from boltons import urlutils
from teal.db import UniqueViolation, DBError
from teal.marshmallow import ValidationError
from ereuse_utils.test import ANY
from requests.exceptions import HTTPError
from teal.db import DBError, UniqueViolation
from teal.marshmallow import ValidationError
from ereuse_devicehub.client import UserClient
from ereuse_devicehub.db import db
from ereuse_devicehub.devicehub import Devicehub
from ereuse_devicehub.resources.action.models import Action, BenchmarkDataStorage, \
BenchmarkProcessor, EraseSectors, RateComputer, Snapshot, SnapshotRequest, VisualTest, \
EreusePrice, Ready
from ereuse_devicehub.parser.models import SnapshotErrors
from ereuse_devicehub.resources.action.models import (
Action,
BenchmarkDataStorage,
BenchmarkProcessor,
EraseSectors,
EreusePrice,
Ready,
Snapshot,
SnapshotRequest,
VisualTest,
)
from ereuse_devicehub.resources.action.views.snapshot import save_json
from ereuse_devicehub.resources.device import models as m
from ereuse_devicehub.resources.device.exceptions import NeedsId
from ereuse_devicehub.resources.device.models import SolidStateDrive
from ereuse_devicehub.resources.device.sync import MismatchBetweenProperties, \
MismatchBetweenTagsAndHid
from ereuse_devicehub.resources.device.sync import (
MismatchBetweenProperties,
MismatchBetweenTagsAndHid,
)
from ereuse_devicehub.resources.documents import documents
from ereuse_devicehub.resources.enums import ComputerChassis, SnapshotSoftware
from ereuse_devicehub.resources.tag import Tag
from ereuse_devicehub.resources.user.models import User
from ereuse_devicehub.resources.action.views.snapshot import save_json
from ereuse_devicehub.resources.documents import documents
from tests.conftest import file, yaml2json, json_encode
from tests import conftest
from tests.conftest import file, file_json, json_encode, yaml2json
@pytest.mark.mvp
@ -43,18 +54,22 @@ def test_snapshot_model():
"""
device = m.Desktop(serial_number='a1', chassis=ComputerChassis.Tower)
# noinspection PyArgumentList
snapshot = Snapshot(uuid=uuid4(),
snapshot = Snapshot(
uuid=uuid4(),
end_time=datetime.now(timezone.utc),
version='1.0',
software=SnapshotSoftware.DesktopApp,
elapsed=timedelta(seconds=25))
elapsed=timedelta(seconds=25),
)
snapshot.device = device
snapshot.request = SnapshotRequest(request={'foo': 'bar'})
db.session.add(snapshot)
db.session.commit()
device = m.Desktop.query.one() # type: m.Desktop
e1 = device.actions[0]
assert isinstance(e1, Snapshot), 'Creation order must be preserved: 1. snapshot, 2. WR'
assert isinstance(
e1, Snapshot
), 'Creation order must be preserved: 1. snapshot, 2. WR'
db.session.delete(device)
db.session.commit()
assert Snapshot.query.one_or_none() is None
@ -63,7 +78,9 @@ def test_snapshot_model():
assert m.Desktop.query.one_or_none() is None
assert m.Device.query.one_or_none() is None
# Check properties
assert device.url == urlutils.URL('http://localhost/devices/%s' % device.devicehub_id)
assert device.url == urlutils.URL(
'http://localhost/devices/%s' % device.devicehub_id
)
@pytest.mark.mvp
@ -78,13 +95,12 @@ def test_snapshot_post(user: UserClient):
"""Tests the post snapshot endpoint (validation, etc), data correctness,
and relationship correctness.
"""
snapshot = snapshot_and_check(user, yaml2json('basic.snapshot'),
action_types=(
BenchmarkProcessor.t,
VisualTest.t,
RateComputer.t
),
perform_second_snapshot=False)
snapshot = snapshot_and_check(
user,
yaml2json('basic.snapshot'),
action_types=(BenchmarkProcessor.t, VisualTest.t),
perform_second_snapshot=False,
)
assert snapshot['software'] == 'Workbench'
assert snapshot['version'] == '11.0'
assert snapshot['uuid'] == 'f5efd26e-8754-46bc-87bf-fbccc39d60d9'
@ -98,14 +114,11 @@ def test_snapshot_post(user: UserClient):
device['components'].sort(key=key)
assert snapshot['components'] == device['components']
assert {c['type'] for c in snapshot['components']} == {m.GraphicCard.t, m.RamModule.t,
m.Processor.t}
rate = next(e for e in snapshot['actions'] if e['type'] == RateComputer.t)
rate, _ = user.get(res=Action, item=rate['id'])
assert rate['device']['id'] == snapshot['device']['id']
rate['components'].sort(key=key)
assert rate['components'] == snapshot['components']
assert rate['snapshot']['id'] == snapshot['id']
assert {c['type'] for c in snapshot['components']} == {
m.GraphicCard.t,
m.RamModule.t,
m.Processor.t,
}
@pytest.mark.mvp
@ -127,20 +140,26 @@ def test_same_device_tow_users(user: UserClient, user2: UserClient):
assert pc['ownerID'] != pc2['ownerID']
assert pc['hid'] == pc2['hid']
@pytest.mark.mvp
def test_snapshot_update_timefield_updated(user: UserClient):
"""
Tests for check if one computer have the time mark updated when one component of it is updated
"""
computer1 = yaml2json('1-device-with-components.snapshot')
snapshot = snapshot_and_check(user,
snapshot = snapshot_and_check(
user,
computer1,
action_types=(BenchmarkProcessor.t,
RateComputer.t),
perform_second_snapshot=False)
action_types=(BenchmarkProcessor.t,),
perform_second_snapshot=False,
)
computer2 = yaml2json('2-second-device-with-components-of-first.snapshot')
snapshot_and_check(user, computer2, action_types=('Remove', 'RateComputer'),
perform_second_snapshot=False)
snapshot_and_check(
user,
computer2,
action_types=('Remove',),
perform_second_snapshot=False,
)
pc1_devicehub_id = snapshot['device']['devicehubID']
pc1, _ = user.get(res=m.Device, item=pc1_devicehub_id)
assert pc1['updated'] != snapshot['device']['updated']
@ -165,7 +184,10 @@ def test_snapshot_power_on_hours(user: UserClient):
test_data_storage = ac
break
assert test_data_storage.lifetime.total_seconds()/3600 == test_data_storage.power_on_hours
assert (
test_data_storage.lifetime.total_seconds() / 3600
== test_data_storage.power_on_hours
)
@pytest.mark.mvp
@ -176,10 +198,7 @@ def test_snapshot_component_add_remove(user: UserClient):
def get_actions_info(actions: List[dict]) -> tuple:
return tuple(
(
e['type'],
[c['serialNumber'] for c in e['components']]
)
(e['type'], [c['serialNumber'] for c in e['components']])
for e in user.get_many(res=Action, resources=actions, key='id')
)
@ -198,15 +217,19 @@ def test_snapshot_component_add_remove(user: UserClient):
pc1, _ = user.get(res=m.Device, item=pc1_devicehub_id)
update1_pc1 = pc1['updated']
# Parent contains components
assert tuple(c['serialNumber'] for c in pc1['components']) == ('p1c1s', 'p1c2s', 'p1c3s')
assert tuple(c['serialNumber'] for c in pc1['components']) == (
'p1c1s',
'p1c2s',
'p1c3s',
)
# Components contain parent
assert all(c['parent'] == pc1_id for c in pc1['components'])
# pc has three actions: Snapshot, BenchmarkProcessor and RateComputer
assert len(pc1['actions']) == 3
assert len(pc1['actions']) == 2
assert pc1['actions'][1]['type'] == Snapshot.t
# p1c1s has Snapshot
p1c1s, _ = user.get(res=m.Device, item=pc1['components'][0]['devicehubID'])
assert tuple(e['type'] for e in p1c1s['actions']) == ('Snapshot', 'RateComputer')
assert tuple(e['type'] for e in p1c1s['actions']) == ('Snapshot',)
# We register a new device
# It has the processor of the first one (p1c2s)
@ -228,23 +251,32 @@ def test_snapshot_component_add_remove(user: UserClient):
# PC1
assert tuple(c['serialNumber'] for c in pc1['components']) == ('p1c1s', 'p1c3s')
assert all(c['parent'] == pc1_id for c in pc1['components'])
assert tuple(e['type'] for e in pc1['actions']) == ('BenchmarkProcessor', 'Snapshot', 'RateComputer', 'Remove')
assert tuple(e['type'] for e in pc1['actions']) == (
'BenchmarkProcessor',
'Snapshot',
'Remove',
)
# PC2
assert tuple(c['serialNumber'] for c in pc2['components']) == ('p1c2s', 'p2c1s')
assert all(c['parent'] == pc2_id for c in pc2['components'])
assert tuple(e['type'] for e in pc2['actions']) == ('Snapshot', 'RateComputer')
assert tuple(e['type'] for e in pc2['actions']) == ('Snapshot',)
# p1c2s has two Snapshots, a Remove and an Add
p1c2s, _ = user.get(res=m.Device, item=pc2['components'][0]['devicehubID'])
assert tuple(e['type'] for e in p1c2s['actions']) == (
'BenchmarkProcessor', 'Snapshot', 'RateComputer', 'Snapshot', 'Remove', 'RateComputer'
'BenchmarkProcessor',
'Snapshot',
'Snapshot',
'Remove',
)
# We register the first device again, but removing motherboard
# and moving processor from the second device to the first.
# We have created 1 Remove (from PC2's processor back to PC1)
# PC 0: p1c2s, p1c3s. PC 1: p2c1s
s3 = yaml2json('3-first-device-but-removing-motherboard-and-adding-processor-from-2.snapshot')
snapshot_and_check(user, s3, ('Remove', 'RateComputer'), perform_second_snapshot=False)
s3 = yaml2json(
'3-first-device-but-removing-motherboard-and-adding-processor-from-2.snapshot'
)
snapshot_and_check(user, s3, ('Remove',), perform_second_snapshot=False)
pc1, _ = user.get(res=m.Device, item=pc1_devicehub_id)
pc2, _ = user.get(res=m.Device, item=pc2_devicehub_id)
# Check if the update_timestamp is updated
@ -260,54 +292,49 @@ def test_snapshot_component_add_remove(user: UserClient):
# id, type, components, snapshot
('BenchmarkProcessor', []), # first BenchmarkProcessor
('Snapshot', ['p1c1s', 'p1c2s', 'p1c3s']), # first Snapshot1
('RateComputer', ['p1c1s', 'p1c2s', 'p1c3s']),
('Remove', ['p1c2s']), # Remove Processor in Snapshot2
('Snapshot', ['p1c2s', 'p1c3s']), # This Snapshot3
('RateComputer', ['p1c2s', 'p1c3s'])
)
# PC2
assert tuple(c['serialNumber'] for c in pc2['components']) == ('p2c1s',)
assert all(c['parent'] == pc2_id for c in pc2['components'])
assert tuple(e['type'] for e in pc2['actions']) == (
'Snapshot', # Second Snapshot
'RateComputer',
'Remove' # the processor we added in 2.
'Remove', # the processor we added in 2.
)
# p1c2s has Snapshot, Remove and Add
p1c2s, _ = user.get(res=m.Device, item=pc1['components'][0]['devicehubID'])
assert tuple(get_actions_info(p1c2s['actions'])) == (
('BenchmarkProcessor', []), # first BenchmarkProcessor
('Snapshot', ['p1c1s', 'p1c2s', 'p1c3s']), # First Snapshot to PC1
('RateComputer', ['p1c1s', 'p1c2s', 'p1c3s']),
('Snapshot', ['p1c2s', 'p2c1s']), # Second Snapshot to PC2
('Remove', ['p1c2s']), # ...which caused p1c2s to be removed form PC1
('RateComputer', ['p1c2s', 'p2c1s']),
('Snapshot', ['p1c2s', 'p1c3s']), # The third Snapshot to PC1
('Remove', ['p1c2s']), # ...which caused p1c2 to be removed from PC2
('RateComputer', ['p1c2s', 'p1c3s'])
)
# We register the first device but without the processor,
# adding a graphic card and adding a new component
s4 = yaml2json('4-first-device-but-removing-processor.snapshot-and-adding-graphic-card')
snapshot4 = snapshot_and_check(user, s4, ('RateComputer',), perform_second_snapshot=False)
s4 = yaml2json(
'4-first-device-but-removing-processor.snapshot-and-adding-graphic-card'
)
pc1, _ = user.get(res=m.Device, item=pc1_devicehub_id)
pc2, _ = user.get(res=m.Device, item=pc2_devicehub_id)
# Check if the update_timestamp is updated
update3_pc2 = pc2['updated']
update4_pc1 = pc1['updated']
assert not update4_pc1 in [update1_pc1, update2_pc1, update3_pc1]
assert update4_pc1 in [update1_pc1, update2_pc1, update3_pc1]
assert update3_pc2 == update2_pc2
# PC 0: p1c3s, p1c4s. PC1: p2c1s
assert {c['serialNumber'] for c in pc1['components']} == {'p1c3s'}
assert {c['serialNumber'] for c in pc1['components']} == {'p1c2s', 'p1c3s'}
assert all(c['parent'] == pc1_id for c in pc1['components'])
# This last Action only
assert get_actions_info(pc1['actions'])[-1] == ('RateComputer', ['p1c3s'])
# PC2
# We haven't changed PC2
assert tuple(c['serialNumber'] for c in pc2['components']) == ('p2c1s',)
assert all(c['parent'] == pc2_id for c in pc2['components'])
@pytest.mark.mvp
def test_snapshot_post_without_hid(user: UserClient):
"""Tests the post snapshot endpoint (validation, etc), data correctness,
@ -338,15 +365,16 @@ def test_snapshot_tag_inner_tag(user: UserClient, tag_id: str, app: Devicehub):
b = yaml2json('basic.snapshot')
b['device']['tags'] = [{'type': 'Tag', 'id': tag_id}]
snapshot_and_check(user, b,
action_types=(RateComputer.t, BenchmarkProcessor.t, VisualTest.t))
snapshot_and_check(user, b, action_types=(BenchmarkProcessor.t, VisualTest.t))
with app.app_context():
tag = Tag.query.all()[0] # type: Tag
assert tag.device_id == 3, 'Tag should be linked to the first device'
@pytest.mark.mvp
def test_snapshot_tag_inner_tag_mismatch_between_tags_and_hid(user: UserClient, tag_id: str):
def test_snapshot_tag_inner_tag_mismatch_between_tags_and_hid(
user: UserClient, tag_id: str
):
"""Ensures one device cannot 'steal' the tag from another one."""
pc1 = yaml2json('basic.snapshot')
pc1['device']['tags'] = [{'type': 'Tag', 'id': tag_id}]
@ -396,7 +424,7 @@ def test_snapshot_component_containing_components(user: UserClient):
'type': 'Processor',
'serialNumber': 'foo',
'manufacturer': 'bar',
'model': 'baz'
'model': 'baz',
}
user.post(json_encode(s), res=Snapshot, status=ValidationError)
@ -435,13 +463,15 @@ def test_not_remove_ram_in_same_computer(user: UserClient):
snap1, _ = user.post(json_encode(s), res=Snapshot)
s['uuid'] = '74caa7eb-2bad-4333-94f6-6f1b031d0774'
s['components'].append({
s['components'].append(
{
"actions": [],
"manufacturer": "Intel Corporation",
"model": "NM10/ICH7 Family High Definition Audio Controller",
"serialNumber": "mp2pc",
"type": "SoundCard"
})
"type": "SoundCard",
}
)
dev1 = m.Device.query.filter_by(id=snap1['device']['id']).one()
ram1 = [x.id for x in dev1.components if x.type == 'RamModule'][0]
snap2, _ = user.post(json_encode(s), res=Snapshot)
@ -455,28 +485,6 @@ def test_not_remove_ram_in_same_computer(user: UserClient):
assert dev1.components == dev2.components
@pytest.mark.mvp
def test_ereuse_price(user: UserClient):
"""Tests a Snapshot with EraseSectors and the resulting privacy
properties.
This tests ensures that only the last erasure is picked up, as
erasures have always custom endTime value set.
"""
s = yaml2json('erase-sectors.snapshot')
assert s['components'][0]['actions'][0]['endTime'] == '2018-06-01T09:12:06+02:00'
s['device']['type'] = 'Server'
snapshot = snapshot_and_check(user, s, action_types=(
EraseSectors.t,
BenchmarkDataStorage.t,
BenchmarkProcessor.t,
RateComputer.t,
EreusePrice.t
), perform_second_snapshot=False)
ereuse_price = snapshot['actions'][-1]
assert len(ereuse_price) > 0
@pytest.mark.mvp
def test_erase_privacy_standards_endtime_sort(user: UserClient):
"""Tests a Snapshot with EraseSectors and the resulting privacy
@ -487,33 +495,48 @@ def test_erase_privacy_standards_endtime_sort(user: UserClient):
"""
s = yaml2json('erase-sectors.snapshot')
assert s['components'][0]['actions'][0]['endTime'] == '2018-06-01T09:12:06+02:00'
snapshot = snapshot_and_check(user, s, action_types=(
snapshot = snapshot_and_check(
user,
s,
action_types=(
EraseSectors.t,
BenchmarkDataStorage.t,
BenchmarkProcessor.t,
RateComputer.t,
EreusePrice.t
), perform_second_snapshot=False)
),
perform_second_snapshot=False,
)
# Perform a new snapshot changing the erasure time, as if
# it is a new erasure performed after.
erase = next(e for e in snapshot['actions'] if e['type'] == EraseSectors.t)
assert erase['endTime'] == '2018-06-01T07:12:06+00:00'
s['uuid'] = uuid4()
s['components'][0]['actions'][0]['endTime'] = '2018-06-01T07:14:00+00:00'
snapshot = snapshot_and_check(user, s, action_types=(
snapshot = snapshot_and_check(
user,
s,
action_types=(
EraseSectors.t,
BenchmarkDataStorage.t,
BenchmarkProcessor.t,
RateComputer.t,
EreusePrice.t
), perform_second_snapshot=False)
),
perform_second_snapshot=False,
)
# The actual test
storage = next(e for e in snapshot['components'] if e['type'] == SolidStateDrive.t)
storage, _ = user.get(res=m.Device, item=storage['devicehubID']) # Let's get storage actions too
storage, _ = user.get(
res=m.Device, item=storage['devicehubID']
) # Let's get storage actions too
# order: endTime ascending
# erasure1/2 have an user defined time and others actions endTime = created
erasure1, erasure2, benchmark_hdd1, _snapshot1, _, _, benchmark_hdd2, _snapshot2 = storage['actions'][:8]
(
erasure1,
erasure2,
benchmark_hdd1,
_snapshot1,
benchmark_hdd2,
_snapshot2,
) = storage['actions'][:8]
assert erasure1['type'] == erasure2['type'] == 'EraseSectors'
assert benchmark_hdd1['type'] == benchmark_hdd2['type'] == 'BenchmarkDataStorage'
assert _snapshot1['type'] == _snapshot2['type'] == 'Snapshot'
@ -555,8 +578,7 @@ def test_test_data_storage(user: UserClient):
s = file('erase-sectors-2-hdd.snapshot')
snapshot, _ = user.post(res=Snapshot, data=s)
incidence_test = next(
ev for ev in snapshot['actions']
if ev.get('reallocatedSectorCount', None) == 15
ev for ev in snapshot['actions'] if ev.get('reallocatedSectorCount', None) == 15
)
assert incidence_test['severity'] == 'Error'
@ -584,10 +606,12 @@ def assert_similar_components(components1: List[dict], components2: List[dict]):
assert_similar_device(c1, c2)
def snapshot_and_check(user: UserClient,
def snapshot_and_check(
user: UserClient,
input_snapshot: dict,
action_types: Tuple[str, ...] = tuple(),
perform_second_snapshot=True) -> dict:
perform_second_snapshot=True,
) -> dict:
"""Performs a Snapshot and then checks if the result is ok:
- There have been performed the types of actions and in the same
@ -610,18 +634,22 @@ def snapshot_and_check(user: UserClient,
if action['type'] == 'Add':
found_add = True
if found_add:
assert action['type'] != 'Receive', 'All Remove actions must be before the Add ones'
assert (
action['type'] != 'Receive'
), 'All Remove actions must be before the Add ones'
assert input_snapshot['device']
assert_similar_device(input_snapshot['device'], snapshot['device'])
if input_snapshot.get('components', None):
assert_similar_components(input_snapshot['components'], snapshot['components'])
assert all(c['parent'] == snapshot['device']['id'] for c in snapshot['components']), \
'Components must be in their parent'
assert all(
c['parent'] == snapshot['device']['id'] for c in snapshot['components']
), 'Components must be in their parent'
if perform_second_snapshot:
if 'uuid' in input_snapshot:
input_snapshot['uuid'] = uuid4()
return snapshot_and_check(user, input_snapshot, action_types,
perform_second_snapshot=False)
return snapshot_and_check(
user, input_snapshot, action_types, perform_second_snapshot=False
)
else:
return snapshot
@ -642,12 +670,12 @@ def test_erase_changing_hdd_between_pcs(user: UserClient):
db.session.commit()
assert dev2.components[1].actions[2].parent == dev1
doc1, response = user.get(res=documents.DocumentDef.t,
item='erasures/{}'.format(dev1.id),
accept=ANY)
doc2, response = user.get(res=documents.DocumentDef.t,
item='erasures/{}'.format(dev2.id),
accept=ANY)
doc1, response = user.get(
res=documents.DocumentDef.t, item='erasures/{}'.format(dev1.id), accept=ANY
)
doc2, response = user.get(
res=documents.DocumentDef.t, item='erasures/{}'.format(dev2.id), accept=ANY
)
assert 'dev1' in doc2
assert 'dev2' in doc2
@ -667,10 +695,9 @@ def test_pc_2(user: UserClient):
snapshot, _ = user.post(res=Snapshot, data=s)
@pytest.mark.mvp
def test_save_snapshot_in_file(app: Devicehub, user: UserClient):
""" This test check if works the function save_snapshot_in_file """
"""This test check if works the function save_snapshot_in_file"""
snapshot_no_hid = yaml2json('basic.snapshot.nohid')
tmp_snapshots = app.config['TMP_SNAPSHOTS']
path_dir_base = os.path.join(tmp_snapshots, user.user['email'], 'errors')
@ -696,7 +723,7 @@ def test_save_snapshot_in_file(app: Devicehub, user: UserClient):
@pytest.mark.mvp
def test_action_no_snapshot_without_save_file(app: Devicehub, user: UserClient):
""" This test check if the function save_snapshot_in_file not work when we
"""This test check if the function save_snapshot_in_file not work when we
send one other action different to snapshot
"""
s = file('laptop-hp_255_g3_notebook-hewlett-packard-cnd52270fw.snapshot')
@ -712,9 +739,10 @@ def test_action_no_snapshot_without_save_file(app: Devicehub, user: UserClient):
assert os.path.exists(tmp_snapshots) == False
@pytest.mark.mvp
def test_save_snapshot_with_debug(app: Devicehub, user: UserClient):
""" This test check if works the function save_snapshot_in_file """
"""This test check if works the function save_snapshot_in_file"""
snapshot_file = yaml2json('basic.snapshot.with_debug')
debug = snapshot_file['debug']
user.post(res=Snapshot, data=json_encode(snapshot_file))
@ -738,7 +766,7 @@ def test_save_snapshot_with_debug(app: Devicehub, user: UserClient):
@pytest.mark.mvp
def test_backup_snapshot_with_errors(app: Devicehub, user: UserClient):
""" This test check if the file snapshot is create when some snapshot is wrong """
"""This test check if the file snapshot is create when some snapshot is wrong"""
tmp_snapshots = app.config['TMP_SNAPSHOTS']
path_dir_base = os.path.join(tmp_snapshots, user.user['email'], 'errors')
snapshot_no_hid = yaml2json('basic.snapshot.badly_formed')
@ -763,7 +791,7 @@ def test_backup_snapshot_with_errors(app: Devicehub, user: UserClient):
@pytest.mark.mvp
def test_snapshot_failed_missing_cpu_benchmark(app: Devicehub, user: UserClient):
""" This test check if the file snapshot is create when some snapshot is wrong """
"""This test check if the file snapshot is create when some snapshot is wrong"""
tmp_snapshots = app.config['TMP_SNAPSHOTS']
path_dir_base = os.path.join(tmp_snapshots, user.user['email'], 'errors')
snapshot_error = yaml2json('failed.snapshot.500.missing-cpu-benchmark')
@ -788,7 +816,7 @@ def test_snapshot_failed_missing_cpu_benchmark(app: Devicehub, user: UserClient)
@pytest.mark.mvp
def test_snapshot_failed_missing_hdd_benchmark(app: Devicehub, user: UserClient):
""" This test check if the file snapshot is create when some snapshot is wrong """
"""This test check if the file snapshot is create when some snapshot is wrong"""
tmp_snapshots = app.config['TMP_SNAPSHOTS']
path_dir_base = os.path.join(tmp_snapshots, user.user['email'], 'errors')
snapshot_error = yaml2json('failed.snapshot.500.missing-hdd-benchmark')
@ -813,7 +841,7 @@ def test_snapshot_failed_missing_hdd_benchmark(app: Devicehub, user: UserClient)
@pytest.mark.mvp
def test_snapshot_not_failed_null_chassis(app: Devicehub, user: UserClient):
""" This test check if the file snapshot is create when some snapshot is wrong """
"""This test check if the file snapshot is create when some snapshot is wrong"""
tmp_snapshots = app.config['TMP_SNAPSHOTS']
path_dir_base = os.path.join(tmp_snapshots, user.user['email'], 'errors')
snapshot_error = yaml2json('desktop-9644w8n-lenovo-0169622.snapshot')
@ -831,7 +859,7 @@ def test_snapshot_not_failed_null_chassis(app: Devicehub, user: UserClient):
@pytest.mark.mvp
def test_snapshot_failed_missing_chassis(app: Devicehub, user: UserClient):
""" This test check if the file snapshot is create when some snapshot is wrong """
"""This test check if the file snapshot is create when some snapshot is wrong"""
tmp_snapshots = app.config['TMP_SNAPSHOTS']
path_dir_base = os.path.join(tmp_snapshots, user.user['email'], 'errors')
snapshot_error = yaml2json('failed.snapshot.422.missing-chassis')
@ -856,7 +884,7 @@ def test_snapshot_failed_missing_chassis(app: Devicehub, user: UserClient):
@pytest.mark.mvp
def test_snapshot_failed_end_time_bug(app: Devicehub, user: UserClient):
""" This test check if the end_time = 0001-01-01 00:00:00+00:00
"""This test check if the end_time = 0001-01-01 00:00:00+00:00
and then we get a /devices, this create a crash
"""
snapshot_file = file('asus-end_time_bug88.snapshot')
@ -870,9 +898,10 @@ def test_snapshot_failed_end_time_bug(app: Devicehub, user: UserClient):
tmp_snapshots = app.config['TMP_SNAPSHOTS']
shutil.rmtree(tmp_snapshots)
@pytest.mark.mvp
def test_snapshot_not_failed_end_time_bug(app: Devicehub, user: UserClient):
""" This test check if the end_time != 0001-01-01 00:00:00+00:00
"""This test check if the end_time != 0001-01-01 00:00:00+00:00
and then we get a /devices, this create a crash
"""
snapshot_file = yaml2json('asus-end_time_bug88.snapshot')
@ -891,7 +920,7 @@ def test_snapshot_not_failed_end_time_bug(app: Devicehub, user: UserClient):
@pytest.mark.mvp
def test_snapshot_bug_smallint_hdd(app: Devicehub, user: UserClient):
""" This test check if the end_time != 0001-01-01 00:00:00+00:00
"""This test check if the end_time != 0001-01-01 00:00:00+00:00
and then we get a /devices, this create a crash
"""
snapshot_file = file('asus-eee-1000h.snapshot.bug1857')
@ -907,7 +936,7 @@ def test_snapshot_bug_smallint_hdd(app: Devicehub, user: UserClient):
@pytest.mark.mvp
def test_snapshot_mobil(app: Devicehub, user: UserClient):
""" This test check if the end_time != 0001-01-01 00:00:00+00:00
"""This test check if the end_time != 0001-01-01 00:00:00+00:00
and then we get a /devices, this create a crash
"""
snapshot_file = file('mobil')
@ -926,3 +955,178 @@ def test_bug_141(user: UserClient):
"""
dev = file('2021-5-4-13-41_time_out_test_datastorage')
user.post(dev, res=Snapshot)
@pytest.mark.mvp
@pytest.mark.usefixtures(conftest.app_context.__name__)
def test_snapshot_wb_lite(user: UserClient):
"""This test check the minimum validation of json that come from snapshot"""
snapshot = file_json(
"2022-03-31_17h18m51s_ZQMPKKX51K67R68VO2X9RNZL08JPL_snapshot.json"
)
body, res = user.post(snapshot, uri="/api/inventory/")
ssd = [x for x in body['components'] if x['type'] == 'SolidStateDrive'][0]
assert body['device']['manufacturer'] == 'lenovo'
# assert body['wbid'] == "LXVC"
assert ssd['serialNumber'] == 's35anx0j401001'
assert res.status == '201 CREATED'
assert '00:28:f8:a6:d5:7e' in body['device']['hid']
dev = m.Device.query.filter_by(id=body['device']['id']).one()
assert dev.actions[0].power_on_hours == 6032
errors = SnapshotErrors.query.filter().all()
assert errors == []
@pytest.mark.mvp
@pytest.mark.usefixtures(conftest.app_context.__name__)
def test_snapshot_wb_lite_qemu(user: UserClient):
"""This test check the minimum validation of json that come from snapshot"""
snapshot = file_json(
"2022-04-01_06h28m54s_YKPZ27NJ2NMRO4893M4L5NRZV5YJ1_snapshot.json"
)
# body, res = user.post(snapshot, res=Snapshot)
body, res = user.post(snapshot, uri="/api/inventory/")
assert body['wbid'] == "YKPZ27NJ2NMRO4893M4L5NRZV5YJ1"
assert res.status == '201 CREATED'
dev = m.Device.query.filter_by(id=body['device']['id']).one()
assert dev.manufacturer == 'qemu'
assert dev.model == 'standard'
assert dev.serial_number is None
assert dev.hid is None
assert dev.actions[0].power_on_hours == 0
assert dev.actions[1].power_on_hours == 0
@pytest.mark.mvp
@pytest.mark.usefixtures(conftest.app_context.__name__)
def test_snapshot_wb_lite_old_snapshots(user: UserClient):
"""This test check the minimum validation of json that come from snapshot"""
wb_dir = Path(__file__).parent.joinpath('files/wb_lite/')
for f in os.listdir(wb_dir):
file_name = "wb_lite/{}".format(f)
snapshot_11 = file_json(file_name)
if not snapshot_11.get('debug'):
continue
lshw = snapshot_11['debug']['lshw']
hwinfo = snapshot_11['debug']['hwinfo']
snapshot_lite = {
'timestamp': snapshot_11['endTime'],
'type': 'Snapshot',
'uuid': str(uuid.uuid4()),
'wbid': 'MLKO1',
'software': 'Workbench',
'version': '2022.03.00',
"schema_api": "1.0.0",
'data': {
'lshw': lshw,
'hwinfo': hwinfo,
'smart': [],
'dmidecode': '',
'lspci': '',
},
}
body11, res = user.post(snapshot_11, res=Snapshot)
bodyLite, res = user.post(snapshot_lite, uri="/api/inventory/")
components11 = []
componentsLite = []
for c in body11.get('components', []):
if c['type'] in ["HardDrive", "SolidStateDrive"]:
continue
components11.append({c.get('model'), c['type'], c.get('manufacturer')})
for c in bodyLite.get('components', []):
componentsLite.append({c.get('model'), c['type'], c.get('manufacturer')})
try:
assert body11['device'].get('hid') == bodyLite['device'].get('hid')
if body11['device'].get('hid'):
assert body11['device']['id'] == bodyLite['device']['id']
assert body11['device'].get('serialNumber') == bodyLite['device'].get(
'serialNumber'
)
assert body11['device'].get('model') == bodyLite['device'].get('model')
assert body11['device'].get('manufacturer') == bodyLite['device'].get(
'manufacturer'
)
# wbLite can find more components than wb11
assert len(components11) <= len(componentsLite)
for c in components11:
assert c in componentsLite
except Exception as err:
# import pdb; pdb.set_trace()
raise err
@pytest.mark.mvp
@pytest.mark.usefixtures(conftest.app_context.__name__)
def test_snapshot_errors(user: UserClient):
"""This test check the minimum validation of json that come from snapshot"""
snapshot_11 = file_json('snapshotErrors.json')
lshw = snapshot_11['debug']['lshw']
hwinfo = snapshot_11['debug']['hwinfo']
snapshot_lite = {
'timestamp': snapshot_11['endTime'],
'type': 'Snapshot',
'uuid': str(uuid.uuid4()),
'wbid': 'MLKO1',
'software': 'Workbench',
'version': '2022.03.00',
"schema_api": "1.0.0",
'data': {
'lshw': lshw,
'hwinfo': hwinfo,
'smart': [],
'dmidecode': '',
'lspci': '',
},
}
assert SnapshotErrors.query.all() == []
body11, res = user.post(snapshot_11, res=Snapshot)
assert SnapshotErrors.query.all() == []
bodyLite, res = user.post(snapshot_lite, uri="/api/inventory/")
assert len(SnapshotErrors.query.all()) == 2
assert body11['device'].get('hid') == bodyLite['device'].get('hid')
assert body11['device']['id'] == bodyLite['device']['id']
assert body11['device'].get('serialNumber') == bodyLite['device'].get(
'serialNumber'
)
assert body11['device'].get('model') == bodyLite['device'].get('model')
assert body11['device'].get('manufacturer') == bodyLite['device'].get(
'manufacturer'
)
components11 = []
componentsLite = []
for c in body11['components']:
if c['type'] == "HardDrive":
continue
components11.append({c['model'], c['type'], c['manufacturer']})
for c in bodyLite['components']:
componentsLite.append({c['model'], c['type'], c['manufacturer']})
assert len(components11) == len(componentsLite)
for c in components11:
assert c in componentsLite
@pytest.mark.mvp
@pytest.mark.usefixtures(conftest.app_context.__name__)
def test_snapshot_errors_timestamp(user: UserClient):
"""This test check the minimum validation of json that come from snapshot"""
snapshot_lite = file_json('snapshot-error-timestamp.json')
bodyLite, res = user.post(snapshot_lite, uri="/api/inventory/")
assert res.status_code == 201
assert len(SnapshotErrors.query.all()) == 1
error = SnapshotErrors.query.all()[0]
assert snapshot_lite['wbid'] == error.wbid
assert user.user['id'] == str(error.owner_id)

View File

@ -41,7 +41,6 @@ def test_workbench_server_condensed(user: UserClient):
('BenchmarkProcessorSysbench', cpu_id),
('StressTest', pc_id),
('EraseSectors', ssd_id),
('EreusePrice', pc_id),
('BenchmarkRamSysbench', pc_id),
('BenchmarkProcessor', cpu_id),
('Install', ssd_id),
@ -49,7 +48,6 @@ def test_workbench_server_condensed(user: UserClient):
('BenchmarkDataStorage', ssd_id),
('BenchmarkDataStorage', hdd_id),
('TestDataStorage', ssd_id),
('RateComputer', pc_id)
}
assert snapshot['closed']
assert snapshot['severity'] == 'Info'
@ -61,10 +59,6 @@ def test_workbench_server_condensed(user: UserClient):
assert device['networkSpeeds'] == [1000, 58]
assert device['processorModel'] == device['components'][3]['model'] == 'p1-1ml'
assert device['ramSize'] == 2048, 'There are 3 RAM: 2 x 1024 and 1 None sizes'
assert device['rate']['closed']
assert device['rate']['severity'] == 'Info'
assert device['rate']['rating'] == 1
assert device['rate']['type'] == RateComputer.t
# TODO JN why haven't same order in actions on each execution?
assert any([ac['type'] in [BenchmarkProcessor.t, BenchmarkRamSysbench.t] for ac in device['actions']])
assert 'tag1' in [x['id'] for x in device['tags']]
@ -145,8 +139,6 @@ def test_real_hp_11(user: UserClient):
assert pc['hid'] == 'desktop-hewlett-packard-hp_compaq_8100_elite_sff-czc0408yjg-6c:62:6d:81:22:9f'
assert pc['chassis'] == 'Tower'
assert set(e['type'] for e in snapshot['actions']) == {
'EreusePrice',
'RateComputer',
'BenchmarkDataStorage',
'BenchmarkProcessor',
'BenchmarkProcessorSysbench',
@ -156,7 +148,8 @@ def test_real_hp_11(user: UserClient):
'TestBios',
'VisualTest'
}
assert len(list(e['type'] for e in snapshot['actions'])) == 10
assert len(list(e['type'] for e in snapshot['actions'])) == 8
assert pc['networkSpeeds'] == [1000, None], 'Device has no WiFi'
assert pc['processorModel'] == 'intel core i3 cpu 530 @ 2.93ghz'
assert pc['ramSize'] == 8192
@ -175,6 +168,7 @@ def test_snapshot_real_eee_1001pxd_with_rate(user: UserClient):
"""Checks the values of the device, components,
actions and their relationships of a real pc.
"""
# import pdb; pdb.set_trace()
s = file('real-eee-1001pxd.snapshot.11')
snapshot, _ = user.post(res=em.Snapshot, data=s)
pc, _ = user.get(res=Device, item=snapshot['device']['devicehubID'])
@ -186,19 +180,10 @@ def test_snapshot_real_eee_1001pxd_with_rate(user: UserClient):
assert pc['hid'] == 'laptop-asustek_computer_inc-1001pxd-b8oaas048286-14:da:e9:42:f6:7c'
assert len(pc['tags']) == 1
assert pc['networkSpeeds'] == [100, 0], 'Although it has WiFi we do not know the speed'
assert pc['rate']
rate = pc['rate']
# assert pc['actions'][0]['appearanceRange'] == 'A'
# assert pc['actions'][0]['functionalityRange'] == 'B'
# TODO add appearance and functionality Range in device[rate]
assert rate['processorRange'] == 'LOW'
assert rate['ramRange'] == 'LOW'
assert rate['ratingRange'] == 'LOW'
assert rate['ram'] == 1.53
# TODO add camelCase instead of snake_case
assert rate['dataStorage'] == 3.76
assert rate['type'] == 'RateComputer'
components = snapshot['components']
wifi = components[0]
assert wifi['hid'] == 'networkadapter-qualcomm_atheros-' \
@ -232,7 +217,7 @@ def test_snapshot_real_eee_1001pxd_with_rate(user: UserClient):
assert em.BenchmarkRamSysbench.t in action_types
assert em.StressTest.t in action_types
assert em.Snapshot.t in action_types
assert len(actions) == 8
assert len(actions) == 6
gpu = components[3]
assert gpu['model'] == 'atom processor d4xx/d5xx/n4xx/n5xx integrated graphics controller'
assert gpu['manufacturer'] == 'intel corporation'
@ -242,7 +227,7 @@ def test_snapshot_real_eee_1001pxd_with_rate(user: UserClient):
assert em.BenchmarkRamSysbench.t in action_types
assert em.StressTest.t in action_types
assert em.Snapshot.t in action_types
assert len(action_types) == 6
assert len(action_types) == 4
sound = components[4]
assert sound['model'] == 'nm10/ich7 family high definition audio controller'
sound = components[5]
@ -264,7 +249,7 @@ def test_snapshot_real_eee_1001pxd_with_rate(user: UserClient):
assert em.TestDataStorage.t in action_types
assert em.EraseBasic.t in action_types
assert em.Snapshot.t in action_types
assert len(action_types) == 9
assert len(action_types) == 7
erase = next(e for e in hdd['actions'] if e['type'] == em.EraseBasic.t)
assert erase['endTime']
assert erase['startTime']