Compare commits

..

5 Commits

Author SHA1 Message Date
pedro b255dddff7 Makefile: easy way to regenerate pxe setup 2024-11-12 01:21:11 +01:00
pedro fb7e768229 pxe: dnsmasq restart, do not cause big error 2024-11-12 01:20:54 +01:00
pedro 768851090a settings.ini.example: legacy, use capital boolean 2024-11-12 01:16:25 +01:00
pedro 4cb3e34b6b legacy: use public_url, url is not very useful
also refactor vars so it is easier to read
2024-11-12 01:15:34 +01:00
pedro b4d86fcc12 pxe: restart dnsmasq after applying config
useful on first init of service
2024-11-11 14:50:33 +01:00
6 changed files with 75 additions and 68 deletions

View File

@ -44,6 +44,11 @@ boot_iso_uefi_secureboot:
-drive file=deploy/iso/workbench_debug.iso,cache=none,if=virtio,format=raw,index=0,media=disk \ -drive file=deploy/iso/workbench_debug.iso,cache=none,if=virtio,format=raw,index=0,media=disk \
-boot menu=on -boot menu=on
# when you change something, you need to refresh it this way
regenerate_pxe_install:
./deploy-workbench.sh
pxe/install-pxe.sh
es_gen: es_gen:
$(MAKE) es_gen_po $(MAKE) es_gen_po
$(MAKE) es_gen_mo $(MAKE) es_gen_mo

View File

@ -327,7 +327,7 @@ echo 'Install requirements'
apt-get install -y --no-install-recommends \ apt-get install -y --no-install-recommends \
sudo locales keyboard-configuration console-setup qrencode \ sudo locales keyboard-configuration console-setup qrencode \
python-is-python3 python3 python3-dev python3-pip pipenv \ python-is-python3 python3 python3-dev python3-pip pipenv \
dmidecode smartmontools hwinfo pciutils lshw nfs-common inxi < /dev/null dmidecode smartmontools hwinfo pciutils lshw nfs-common < /dev/null
# Install lshw B02.19 utility using backports (DEPRECATED in Debian 12) # Install lshw B02.19 utility using backports (DEPRECATED in Debian 12)
#apt install -y -t ${VERSION_CODENAME}-backports lshw < /dev/null #apt install -y -t ${VERSION_CODENAME}-backports lshw < /dev/null

View File

@ -9,7 +9,7 @@ set -u
set -x set -x
main() { main() {
sudo apt install qrencode smartmontools lshw hwinfo dmidecode inxi sudo apt install smartmontools lshw hwinfo dmidecode
} }
main "${@}" main "${@}"

View File

@ -93,6 +93,7 @@ pxe-service=x86PC,"Network Boot",pxelinux
enable-tftp enable-tftp
tftp-root=${tftp_path} tftp-root=${tftp_path}
END END
sudo systemctl restart dnsmasq || true
} }
install_netboot() { install_netboot() {

View File

@ -7,4 +7,4 @@ token = 5018dd65-9abd-4a62-8896-80f34ac66150
# path = /path/to/save # path = /path/to/save
# device = your_device_name # device = your_device_name
# # erase = basic # # erase = basic
# legacy = true # legacy = True

131
workbench-script.py Executable file → Normal file
View File

@ -3,6 +3,7 @@
import os import os
import json import json
import uuid import uuid
import hashlib
import argparse import argparse
import configparser import configparser
import urllib.parse import urllib.parse
@ -15,17 +16,21 @@ import logging
from datetime import datetime from datetime import datetime
SNAPSHOT_BASE = { ## Legacy Functions ##
'timestamp': str(datetime.now()),
'type': 'Snapshot',
'uuid': str(uuid.uuid4()),
'software': "workbench-script",
'version': "0.0.1",
'token_hash': "",
'data': {},
'erase': []
}
def convert_to_legacy_snapshot(snapshot):
snapshot["sid"] = str(uuid.uuid4()).split("-")[0]
snapshot["software"] = "workbench-script"
snapshot["version"] = "dev"
snapshot["schema_api"] = "1.0.0"
snapshot["settings_version"] = "No Settings Version (NaN)"
snapshot["timestamp"] = snapshot["timestamp"].replace(" ", "T")
snapshot["data"]["smart"] = snapshot["data"]["disks"]
snapshot["data"]["lshw"] = json.loads(snapshot["data"]["lshw"])
snapshot["data"].pop("disks")
snapshot.pop("erase")
## End Legacy Functions ##
## Utility Functions ## ## Utility Functions ##
@ -54,33 +59,15 @@ def exec_cmd_erase(cmd):
## End Utility functions ## ## End Utility functions ##
## Legacy Functions ## SNAPSHOT_BASE = {
'timestamp': str(datetime.now()),
def convert_to_legacy_snapshot(snapshot): 'type': 'Snapshot',
snapshot["sid"] = str(uuid.uuid4()).split("-")[1] 'uuid': str(uuid.uuid4()),
snapshot["software"] = "workbench-script" 'software': "workbench-script",
snapshot["version"] = "dev" 'version': "0.0.1",
snapshot["schema_api"] = "1.0.0" 'data': {},
snapshot["settings_version"] = "No Settings Version (NaN)" 'erase': []
snapshot["timestamp"] = snapshot["timestamp"].replace(" ", "T") }
snapshot["data"]["smart"] = snapshot["data"]["disks"]
snapshot["data"].pop("disks")
snapshot["data"].pop("inxi")
snapshot.pop("erase")
snapshot.pop("token_hash")
lshw = 'sudo lshw -xml'
hwinfo = 'sudo hwinfo --reallyall'
lspci = 'sudo lspci -vv'
data = {
'lshw': exec_cmd(lshw) or "{}",
'hwinfo': exec_cmd(hwinfo),
'lspci': exec_cmd(lspci)
}
snapshot['data'].update(data)
## End Legacy Functions ##
## Command Functions ## ## Command Functions ##
@ -258,13 +245,16 @@ def smartctl(all_disks, disk=None):
# TODO permitir selección # TODO permitir selección
# TODO permitir que vaya más rápido # TODO permitir que vaya más rápido
def get_data(all_disks): def get_data(all_disks):
lshw = 'sudo lshw -json'
hwinfo = 'sudo hwinfo --reallyall'
dmidecode = 'sudo dmidecode' dmidecode = 'sudo dmidecode'
inxi = "sudo inxi -afmnGEMABD -x 3 --edid --output json --output-file print" lspci = 'sudo lspci -vv'
data = { data = {
'lshw': exec_cmd(lshw) or "{}",
'disks': smartctl(all_disks), 'disks': smartctl(all_disks),
'hwinfo': exec_cmd(hwinfo),
'dmidecode': exec_cmd(dmidecode), 'dmidecode': exec_cmd(dmidecode),
'inxi': exec_cmd(inxi) 'lspci': exec_cmd(lspci)
} }
return data return data
@ -276,20 +266,20 @@ def gen_snapshot(all_disks):
return snapshot return snapshot
def save_snapshot_in_disk(snapshot, path, snap_uuid): def save_snapshot_in_disk(snapshot, path):
snapshot_path = os.path.join(path, 'snapshots') snapshot_path = os.path.join(path, 'snapshots')
filename = "{}/{}_{}.json".format( filename = "{}/{}_{}.json".format(
snapshot_path, snapshot_path,
datetime.now().strftime("%Y%m%d-%H_%M_%S"), datetime.now().strftime("%Y%m%d-%H_%M_%S"),
snap_uuid) snapshot['uuid'])
try: try:
if not os.path.exists(snapshot_path): if not os.path.exists(snapshot_path):
os.makedirs(snapshot_path) os.makedirs(snapshot_path)
logger.info(_("Created snapshots directory at '%s'"), snapshot_path) logger.info(_("Created snapshots directory at '%s'"), snapshot_path)
with open(filename, "w") as f: with open(filename, "w") as f:
f.write(snapshot) f.write(json.dumps(snapshot))
logger.info(_("Snapshot written in path '%s'"), filename) logger.info(_("Snapshot written in path '%s'"), filename)
except Exception as e: except Exception as e:
try: try:
@ -297,29 +287,21 @@ def save_snapshot_in_disk(snapshot, path, snap_uuid):
fallback_filename = "{}/{}_{}.json".format( fallback_filename = "{}/{}_{}.json".format(
path, path,
datetime.now().strftime("%Y%m%d-%H_%M_%S"), datetime.now().strftime("%Y%m%d-%H_%M_%S"),
snap_uuid) snapshot['uuid'])
with open(fallback_filename, "w") as f: with open(fallback_filename, "w") as f:
f.write(snapshot) f.write(json.dumps(snapshot))
logger.warning(_("Snapshot written in fallback path '%s'"), fallback_filename) logger.warning(_("Snapshot written in fallback path '%s'"), fallback_filename)
except Exception as e: except Exception as e:
logger.error(_("Could not save snapshot locally. Reason: Failed to write in fallback path:\n %s"), e) logger.error(_("Could not save snapshot locally. Reason: Failed to write in fallback path:\n %s"), e)
# TODO sanitize url, if url is like this, it fails # TODO sanitize url, if url is like this, it fails
# url = 'http://127.0.0.1:8000/api/snapshot/' # url = 'http://127.0.0.1:8000/api/snapshot/'
def send_snapshot_to_devicehub(snapshot, token, url): def send_snapshot_to_devicehub(snapshot, token, url, legacy):
url_components = urllib.parse.urlparse(url) url_components = urllib.parse.urlparse(url)
if isinstance(snapshot, str):
snapshot = json.loads(snapshot)
ev_path = "evidence/{}".format(snapshot["uuid"]) ev_path = "evidence/{}".format(snapshot["uuid"])
components = (url_components.scheme, url_components.netloc, ev_path, '', '', '') components = (url_components.scheme, url_components.netloc, ev_path, '', '', '')
ev_url = urllib.parse.urlunparse(components) ev_url = urllib.parse.urlunparse(components)
# apt install qrencode # apt install qrencode
qr = "echo {} | qrencode -t ANSI".format(ev_url)
print(exec_cmd(qr))
print(ev_url)
headers = { headers = {
"Authorization": f"Bearer {token}", "Authorization": f"Bearer {token}",
@ -330,15 +312,38 @@ def send_snapshot_to_devicehub(snapshot, token, url):
request = urllib.request.Request(url, data=data, headers=headers) request = urllib.request.Request(url, data=data, headers=headers)
with urllib.request.urlopen(request) as response: with urllib.request.urlopen(request) as response:
status_code = response.getcode() status_code = response.getcode()
#response_text = response.read().decode('utf-8') response_text = response.read().decode('utf-8')
if 200 <= status_code < 300: if 200 <= status_code < 300:
logger.info(_("Snapshot successfully sent to '%s'"), url) logger.info(_("Snapshot successfully sent to '%s'"), url)
if legacy:
try:
response = json.loads(response_text)
public_url = response.get('public_url')
dhid = response.get('dhid')
if public_url:
# apt install qrencode
qr = "echo {} | qrencode -t ANSI".format(public_url)
print(exec_cmd(qr))
print("url: {}".format(public_url))
if dhid:
print("dhid: {}".format(dhid))
except Exception:
logger.error(response_text)
else: else:
logger.error(_("Snapshot cannot sent to '%s'"), url) qr = "echo {} | qrencode -t ANSI".format(ev_url)
print(exec_cmd(qr))
print(f"url: {ev_url}")
else:
logger.error(_("Snapshot %s could not be sent to URL '%s'"), snapshot["uuid"], url)
# TODO review all the try-except thing here; maybe the try inside legacy does not make sense anymore
except urllib.error.HTTPError as e:
error_details = e.read().decode('utf-8') # Get the error response body
logger.error(_("Snapshot %s not remotely sent to URL '%s'. Server responded with error:\n %s"),
snapshot["uuid"], url, error_details)
except Exception as e: except Exception as e:
logger.error(_("Snapshot not remotely sent to URL '%s'. Do you have internet? Is your server up & running? Is the url token authorized?\n %s"), url, e) logger.error(_("Snapshot %s not remotely sent to URL '%s'. Do you have internet? Is your server up & running? Is the url token authorized?\n %s"), snapshot["uuid"], url, e)
def load_config(config_file="settings.ini"): def load_config(config_file="settings.ini"):
@ -425,6 +430,7 @@ def main():
config_file = args.config config_file = args.config
config = load_config(config_file) config = load_config(config_file)
legacy = config.get("legacy")
# TODO show warning if non root, means data is not complete # TODO show warning if non root, means data is not complete
# if annotate as potentially invalid snapshot (pending the new API to be done) # if annotate as potentially invalid snapshot (pending the new API to be done)
@ -433,24 +439,19 @@ def main():
all_disks = get_disks() all_disks = get_disks()
snapshot = gen_snapshot(all_disks) snapshot = gen_snapshot(all_disks)
snap_uuid = snapshot["uuid"]
if config['erase'] and config['device'] and not config.get("legacy"): if config['erase'] and config['device'] and not config.get("legacy"):
snapshot['erase'] = gen_erase(all_disks, config['erase'], user_disk=config['device']) snapshot['erase'] = gen_erase(all_disks, config['erase'], user_disk=config['device'])
elif config['erase'] and not config.get("legacy"): elif config['erase'] and not config.get("legacy"):
snapshot['erase'] = gen_erase(all_disks, config['erase']) snapshot['erase'] = gen_erase(all_disks, config['erase'])
if legacy:
if config.get("legacy"):
convert_to_legacy_snapshot(snapshot) convert_to_legacy_snapshot(snapshot)
snapshot = json.dumps(snapshot)
else:
snapshot = json.dumps(snapshot)
save_snapshot_in_disk(snapshot, config['path'], snap_uuid) save_snapshot_in_disk(snapshot, config['path'])
if config['url']: if config['url']:
send_snapshot_to_devicehub(snapshot, config['token'], config['url']) send_snapshot_to_devicehub(snapshot, config['token'], config['url'], legacy)
logger.info(_("END")) logger.info(_("END"))