From 468b6f63c43121716f4958cf2d0ad6353fada0cf Mon Sep 17 00:00:00 2001 From: Cayo Puigdefabregas Date: Wed, 6 Nov 2024 14:10:02 +0100 Subject: [PATCH] create credential from snapshot --- settings.ini.example | 2 + workbench-script.py | 95 +++++++++++++++++++++++++++++++++++--------- 2 files changed, 79 insertions(+), 18 deletions(-) diff --git a/settings.ini.example b/settings.ini.example index a51397b..1d7e7f3 100644 --- a/settings.ini.example +++ b/settings.ini.example @@ -1,6 +1,8 @@ [settings] url = http://localhost:8000/api/snapshot/ token = '1234' +# wb_sign_token = "27de6ad7-cee2-4fe8-84d4-c7eea9c969c8" +# url_wallet = "http://localhost" # path = /path/to/save # device = your_device_name # # erase = basic diff --git a/workbench-script.py b/workbench-script.py index 04667ec..9e17ff0 100644 --- a/workbench-script.py +++ b/workbench-script.py @@ -13,13 +13,30 @@ import gettext import locale import logging +from pathlib import Path +from string import Template from datetime import datetime +BASE_DIR = Path(__file__).resolve().parent + + +SNAPSHOT_BASE = { + 'timestamp': str(datetime.now()), + 'type': 'Snapshot', + 'uuid': str(uuid.uuid4()), + 'software': "workbench-script", + 'version': "0.0.1", + 'token_hash': "", + 'data': {}, + 'erase': [] +} + + ## Legacy Functions ## def convert_to_legacy_snapshot(snapshot): - snapshot["sid"] = str(uuid.uuid4()).split("-")[0] + snapshot["sid"] = str(uuid.uuid4()).split("-")[1] snapshot["software"] = "workbench-script" snapshot["version"] = "dev" snapshot["schema_api"] = "1.0.0" @@ -28,6 +45,7 @@ def convert_to_legacy_snapshot(snapshot): snapshot["data"]["smart"] = snapshot["data"]["disks"] snapshot["data"].pop("disks") snapshot.pop("erase") + snapshot.pop("token_hash") ## End Legacy Functions ## @@ -58,17 +76,16 @@ def exec_cmd_erase(cmd): ## End Utility functions ## -SNAPSHOT_BASE = { - 'timestamp': str(datetime.now()), - 'type': 'Snapshot', - 'uuid': str(uuid.uuid4()), - 'software': "workbench-script", - 'version': "0.0.1", - 'data': {}, - 'erase': [] -} - +def convert_to_credential(snapshot): + snapshot["data"] = json.dumps(snapshot["data"]) + file_path = os.path.join(BASE_DIR, "templates", "snapshot.json") + with open(file_path) as f: + ff = f.read() + template = Template(ff) + cred = template.substitute(**snapshot) + return cred + ## Command Functions ## ## Erase Functions ## ## Xavier Functions ## @@ -265,20 +282,20 @@ def gen_snapshot(all_disks): return snapshot -def save_snapshot_in_disk(snapshot, path): +def save_snapshot_in_disk(snapshot, path, snap_uuid): snapshot_path = os.path.join(path, 'snapshots') filename = "{}/{}_{}.json".format( snapshot_path, datetime.now().strftime("%Y%m%d-%H_%M_%S"), - snapshot['uuid']) + snap_uuid) try: if not os.path.exists(snapshot_path): os.makedirs(snapshot_path) logger.info(_("Created snapshots directory at '%s'"), snapshot_path) with open(filename, "w") as f: - f.write(json.dumps(snapshot)) + f.write(snapshot) logger.info(_("Snapshot written in path '%s'"), filename) except Exception as e: try: @@ -286,13 +303,37 @@ def save_snapshot_in_disk(snapshot, path): fallback_filename = "{}/{}_{}.json".format( path, datetime.now().strftime("%Y%m%d-%H_%M_%S"), - snapshot['uuid']) + snap_uuid) with open(fallback_filename, "w") as f: - f.write(json.dumps(snapshot)) + f.write(snapshot) logger.warning(_("Snapshot written in fallback path '%s'"), fallback_filename) except Exception as e: logger.error(_("Could not save snapshot locally. Reason: Failed to write in fallback path:\n %s"), e) + +def send_to_sign_credential(cred, token, url): + headers = { + "Authorization": f"Bearer {token}", + "Content-Type": "application/json" + } + + try: + data = json.dumps(cred).encode('utf-8') + request = urllib.request.Request(url, data=data, headers=headers) + with urllib.request.urlopen(request) as response: + status_code = response.getcode() + #response_text = response.read().decode('utf-8') + + if 200 <= status_code < 300: + logger.info(_("Credential successfully signed")) + else: + logger.error(_("Credential cannot signed in '%s'"), url) + + except Exception as e: + logger.error(_("Credential not remotely sent to URL '%s'. Do you have internet? Is your server up & running? Is the url token authorized?\n %s"), url, e) + + + # TODO sanitize url, if url is like this, it fails # url = 'http://127.0.0.1:8000/api/snapshot/' def send_snapshot_to_devicehub(snapshot, token, url): @@ -344,6 +385,8 @@ def load_config(config_file="settings.ini"): device = config.get('settings', 'device', fallback=None) erase = config.get('settings', 'erase', fallback=None) legacy = config.get('settings', 'legacy', fallback=None) + url_wallet = config.get('settings', 'url_wallet', fallback=None) + wb_sign_token = config.get('settings', 'wb_sign_token', fallback=None) else: logger.error(_("Config file '%s' not found. Using default values."), config_file) path = os.path.join(os.getcwd()) @@ -355,7 +398,9 @@ def load_config(config_file="settings.ini"): 'token': token, 'device': device, 'erase': erase, - 'legacy': legacy + 'legacy': legacy, + 'wb_sign_token': wb_sign_token, + 'url_wallet': url_wallet } def parse_args(): @@ -417,16 +462,30 @@ def main(): all_disks = get_disks() snapshot = gen_snapshot(all_disks) + snap_uuid = snapshot["uuid"] if config['erase'] and config['device'] and not config.get("legacy"): snapshot['erase'] = gen_erase(all_disks, config['erase'], user_disk=config['device']) elif config['erase'] and not config.get("legacy"): snapshot['erase'] = gen_erase(all_disks, config['erase']) + + if config.get("wb_sign_token"): + tk = config.get("wb_sign_token").encode("utf8") + snapshot["token_hash"] = hashlib.hash256(tk).hexdigest() + if config.get("legacy"): convert_to_legacy_snapshot(snapshot) + snapshot = json.dumps(snapshot) + else: + snapshot = convert_to_credential(snapshot) + url_wallet = config.get("url_wallet") + wb_sign_token = config.get("wb_sign_token") + if url_wallet and wb_sign_token: + snapshot = send_to_sign_credential(snapshot, wb_sign_token, url_wallet) - save_snapshot_in_disk(snapshot, config['path']) + + save_snapshot_in_disk(snapshot, config['path'], snap_uuid) if config['url']: send_snapshot_to_devicehub(snapshot, config['token'], config['url'])