diff --git a/workbench-script.py b/workbench-script.py index 010d593..99dbc4a 100644 --- a/workbench-script.py +++ b/workbench-script.py @@ -3,7 +3,6 @@ import os import json import uuid -import hashlib import argparse import configparser import urllib.parse @@ -16,21 +15,17 @@ import logging from datetime import datetime -## Legacy Functions ## +SNAPSHOT_BASE = { + 'timestamp': str(datetime.now()), + 'type': 'Snapshot', + 'uuid': str(uuid.uuid4()), + 'software': "workbench-script", + 'version': "0.0.1", + 'token_hash': "", + 'data': {}, + 'erase': [] +} -def convert_to_legacy_snapshot(snapshot): - snapshot["sid"] = str(uuid.uuid4()).split("-")[0] - snapshot["software"] = "workbench-script" - snapshot["version"] = "dev" - snapshot["schema_api"] = "1.0.0" - snapshot["settings_version"] = "No Settings Version (NaN)" - snapshot["timestamp"] = snapshot["timestamp"].replace(" ", "T") - snapshot["data"]["smart"] = snapshot["data"]["disks"] - snapshot["data"]["lshw"] = json.loads(snapshot["data"]["lshw"]) - snapshot["data"].pop("disks") - snapshot.pop("erase") - -## End Legacy Functions ## ## Utility Functions ## @@ -59,17 +54,35 @@ def exec_cmd_erase(cmd): ## End Utility functions ## -SNAPSHOT_BASE = { - 'timestamp': str(datetime.now()), - 'type': 'Snapshot', - 'uuid': str(uuid.uuid4()), - 'software': "workbench-script", - 'version': "0.0.1", - 'data': {}, - 'erase': [] -} +## Legacy Functions ## +def convert_to_legacy_snapshot(snapshot): + snapshot["sid"] = str(uuid.uuid4()).split("-")[1] + snapshot["software"] = "workbench-script" + snapshot["version"] = "dev" + snapshot["schema_api"] = "1.0.0" + snapshot["settings_version"] = "No Settings Version (NaN)" + snapshot["timestamp"] = snapshot["timestamp"].replace(" ", "T") + snapshot["data"]["smart"] = snapshot["data"]["disks"] + snapshot["data"].pop("disks") + snapshot["data"].pop("inxi") + snapshot.pop("erase") + snapshot.pop("token_hash") + lshw = 'sudo lshw -xml' + hwinfo = 'sudo hwinfo --reallyall' + lspci = 'sudo lspci -vv' + + data = { + 'lshw': exec_cmd(lshw) or "{}", + 'hwinfo': exec_cmd(hwinfo), + 'lspci': exec_cmd(lspci) + } + snapshot['data'].update(data) + +## End Legacy Functions ## + + ## Command Functions ## ## Erase Functions ## ## Xavier Functions ## @@ -245,16 +258,13 @@ def smartctl(all_disks, disk=None): # TODO permitir selección # TODO permitir que vaya más rápido def get_data(all_disks): - lshw = 'sudo lshw -json' - hwinfo = 'sudo hwinfo --reallyall' dmidecode = 'sudo dmidecode' - lspci = 'sudo lspci -vv' + inxi = "sudo inxi -afmnGEMABD -x 3 --edid --output json --output-file print" + data = { - 'lshw': exec_cmd(lshw) or "{}", 'disks': smartctl(all_disks), - 'hwinfo': exec_cmd(hwinfo), 'dmidecode': exec_cmd(dmidecode), - 'lspci': exec_cmd(lspci) + 'inxi': exec_cmd(inxi) } return data @@ -266,20 +276,20 @@ def gen_snapshot(all_disks): return snapshot -def save_snapshot_in_disk(snapshot, path): +def save_snapshot_in_disk(snapshot, path, snap_uuid): snapshot_path = os.path.join(path, 'snapshots') filename = "{}/{}_{}.json".format( snapshot_path, datetime.now().strftime("%Y%m%d-%H_%M_%S"), - snapshot['uuid']) + snap_uuid) try: if not os.path.exists(snapshot_path): os.makedirs(snapshot_path) logger.info(_("Created snapshots directory at '%s'"), snapshot_path) with open(filename, "w") as f: - f.write(json.dumps(snapshot)) + f.write(snapshot) logger.info(_("Snapshot written in path '%s'"), filename) except Exception as e: try: @@ -287,21 +297,25 @@ def save_snapshot_in_disk(snapshot, path): fallback_filename = "{}/{}_{}.json".format( path, datetime.now().strftime("%Y%m%d-%H_%M_%S"), - snapshot['uuid']) + snap_uuid) with open(fallback_filename, "w") as f: - f.write(json.dumps(snapshot)) + f.write(snapshot) logger.warning(_("Snapshot written in fallback path '%s'"), fallback_filename) except Exception as e: logger.error(_("Could not save snapshot locally. Reason: Failed to write in fallback path:\n %s"), e) + # TODO sanitize url, if url is like this, it fails # url = 'http://127.0.0.1:8000/api/snapshot/' -def send_snapshot_to_devicehub(snapshot, token, url, legacy): +def send_snapshot_to_devicehub(snapshot, token, url): url_components = urllib.parse.urlparse(url) ev_path = "evidence/{}".format(snapshot["uuid"]) - components = (url_components.scheme, url_components.netloc, ev_path, '', '', '') + components = (url_components.schema, url_components.netloc, ev_path, '', '', '') ev_url = urllib.parse.urlunparse(components) # apt install qrencode + qr = "echo {} | qrencode -t ANSI".format(ev_url) + print(exec_cmd(qr)) + print(ev_url) headers = { "Authorization": f"Bearer {token}", @@ -312,36 +326,15 @@ def send_snapshot_to_devicehub(snapshot, token, url, legacy): request = urllib.request.Request(url, data=data, headers=headers) with urllib.request.urlopen(request) as response: status_code = response.getcode() - response_text = response.read().decode('utf-8') + #response_text = response.read().decode('utf-8') if 200 <= status_code < 300: logger.info(_("Snapshot successfully sent to '%s'"), url) - if legacy: - try: - response = json.loads(response_text) - if response.get('url'): - # apt install qrencode - qr = "echo {} | qrencode -t ANSI".format(response['url']) - print(exec_cmd(qr)) - print("url: {}".format(response['url'])) - if response.get("dhid"): - print("dhid: {}".format(response['dhid'])) - except Exception: - logger.error(response_text) - else: - qr = "echo {} | qrencode -t ANSI".format(ev_url) - print(exec_cmd(qr)) - print(f"url: {ev_url}") else: - logger.error(_("Snapshot %s could not be sent to URL '%s'"), snapshot["uuid"], url) - # TODO review all the try-except thing here; maybe the try inside legacy does not make sense anymore - except urllib.error.HTTPError as e: - error_details = e.read().decode('utf-8') # Get the error response body - logger.error(_("Snapshot %s not remotely sent to URL '%s'. Server responded with error:\n %s"), - snapshot["uuid"], url, error_details) + logger.error(_("Snapshot cannot sent to '%s'"), url) except Exception as e: - logger.error(_("Snapshot %s not remotely sent to URL '%s'. Do you have internet? Is your server up & running? Is the url token authorized?\n %s"), snapshot["uuid"], url, e) + logger.error(_("Snapshot not remotely sent to URL '%s'. Do you have internet? Is your server up & running? Is the url token authorized?\n %s"), url, e) def load_config(config_file="settings.ini"): @@ -428,7 +421,6 @@ def main(): config_file = args.config config = load_config(config_file) - legacy = config.get("legacy") # TODO show warning if non root, means data is not complete # if annotate as potentially invalid snapshot (pending the new API to be done) @@ -437,19 +429,24 @@ def main(): all_disks = get_disks() snapshot = gen_snapshot(all_disks) + snap_uuid = snapshot["uuid"] if config['erase'] and config['device'] and not config.get("legacy"): snapshot['erase'] = gen_erase(all_disks, config['erase'], user_disk=config['device']) elif config['erase'] and not config.get("legacy"): snapshot['erase'] = gen_erase(all_disks, config['erase']) - if legacy: - convert_to_legacy_snapshot(snapshot) - save_snapshot_in_disk(snapshot, config['path']) + if config.get("legacy"): + convert_to_legacy_snapshot(snapshot) + snapshot = json.dumps(snapshot) + else: + snapshot = json.dumps(snapshot) + + save_snapshot_in_disk(snapshot, config['path'], snap_uuid) if config['url']: - send_snapshot_to_devicehub(snapshot, config['token'], config['url'], legacy) + send_snapshot_to_devicehub(snapshot, config['token'], config['url']) logger.info(_("END"))