diff --git a/lon_deployer/fastboot.py b/lon_deployer/fastboot.py new file mode 100644 index 0000000..adb5c8b --- /dev/null +++ b/lon_deployer/fastboot.py @@ -0,0 +1,86 @@ +from . import files, exceptions +from .utils import logger, console + +import os +import subprocess + + +def _fastboot_run(command: [str], serial: str = None) -> str: + try: + if not serial: + cmd = f"fastboot {' '.join(command)}" + else: + cmd = f"fastboot -s {serial} {' '.join(command)}" + fb_out = subprocess.check_output(cmd, stderr=subprocess.STDOUT, shell=True, timeout=60) + except FileNotFoundError: + console.log("Fastboot binary not found") + console.log("Exiting") + exit(1) + except subprocess.CalledProcessError: + raise exceptions.DeviceNotFound("Timed out") + else: + logger.debug(f"fb-cmd: {cmd}") + logger.debug(f"fb-out: {fb_out}") + return fb_out.decode() + + +def list_devices() -> [str]: + return list( + filter( + lambda x: x != "", + [x.split("\t ")[0] for x in _fastboot_run(["devices"]).split("\n")] + ) + ) + + +def check_device(serial: str) -> bool: + return "nabu" in _fastboot_run(["getvar", "product"], serial=serial) + + +def check_parts(serial: str) -> bool: + linux_response = _fastboot_run(["getvar", "partition-type:linux"], serial=serial) + esp_response = _fastboot_run(["getvar", "partition-type:esp"], serial=serial) + logger.debug({ + "esp": 'FAILED' not in esp_response, + "linux": 'FAILED' not in linux_response + }) + return not ("FAILED" in linux_response or "FAILED" in esp_response) + + +def reboot(serial: str) -> None: + _fastboot_run(["reboot"], serial=serial) + + +def boot_ofox(serial: str) -> None: + files.OrangeFox.get() + ofox = files.OrangeFox.filepath + with console.status("[cyan]Booting", spinner="line", spinner_style="white"): + out = _fastboot_run(["boot", ofox], serial) + if "Failed to load/authenticate boot image: Device Error" in out: + raise exceptions.FastbootException("Failed to load/authenticate boot image: Device Error") + + +def flash(serial: str, part: str, data: bytes) -> None: + with open("image.img", "wb") as file: + file.write(data) + + with console.status(f"[cyan]Flashing {part}", spinner="line", spinner_style="white"): + _fastboot_run(["flash", part, "image.img"], serial=serial) + + os.remove("image.img") + + +def restore_parts(serial: str) -> None: + gpt_both = files.GPT_Both0.get() + userdata = files.UserData_Empty.get() + flash(serial, "partition:0", gpt_both) + flash(serial, "userdata", userdata) + + +def clean_device(serial: str) -> None: + _fastboot_run(["erase", "linux"], serial=serial) + _fastboot_run(["erase", "esp"], serial=serial) + + +def wait_for_bootloader(serial: str) -> None: + _fastboot_run(["getvar", "product"], serial=serial) diff --git a/lon_deployer/utils.py b/lon_deployer/utils.py index 583579e..da3e415 100644 --- a/lon_deployer/utils.py +++ b/lon_deployer/utils.py @@ -1,11 +1,11 @@ import logging -import os import re import socket -import subprocess from random import randint from time import sleep +from . import exceptions + import adbutils from rich.logging import RichHandler from rich.console import Console @@ -18,9 +18,6 @@ from rich.progress import ( TransferSpeedColumn, ) -from . import Files - - console = Console(log_path=False) @@ -36,10 +33,6 @@ logging.basicConfig( logger = logging.getLogger("Deployer") -class DeviceNotFound(Exception): - pass - - def get_progress() -> Progress: return Progress( TextColumn("[bold blue]{task.description}", justify="right"), @@ -54,49 +47,6 @@ def get_progress() -> Progress: ) -def fastboot_run(command: [str], serial: str = None) -> str: - - try: - if not serial: - cmd = f"fastboot {' '.join(command)}" - else: - cmd = f"fastboot -s {serial} {' '.join(command)}" - proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) - except FileNotFoundError: - console.log("Fastboot binary not found") - console.log("Exiting") - exit(1) - else: - stdout, stderr = proc.communicate() - proc.wait() - logger.debug(f"fb-cmd: {cmd}") - logger.debug(f"fb-out: {stdout}") - logger.debug(f"fb-err: {stderr}") - return (stdout if stdout else stderr).decode() - - -def list_fb_devices() -> [str]: - return [x.split("\t ")[0] for x in fastboot_run(["devices"]).split("\n")[:-1]] - - -def check_device(serial: str) -> bool: - return "nabu" in fastboot_run(["getvar", "product"], serial=serial) - - -def check_parts(serial: str) -> bool: - linux_response = fastboot_run(["getvar", "partition-type:linux"], serial=serial) - esp_response = fastboot_run(["getvar", "partition-type:esp"], serial=serial) - logger.debug({ - "esp": 'FAILED' not in esp_response, - "linux": 'FAILED' not in linux_response - }) - return not ("FAILED" in linux_response or "FAILED" in esp_response) - - -def reboot_fb_device(serial: str) -> None: - fastboot_run(["reboot"], serial=serial) - - def check_port(tcp_port: int) -> bool: s = socket.socket() try: @@ -114,39 +64,6 @@ def get_port() -> int: return tcp_port -def boot_ofox(serial: str) -> None: - Files.OrangeFox.get() - ofox = Files.OrangeFox.filepath - with console.status("[cyan]Booting", spinner="line", spinner_style="white"): - fastboot_run(["boot", ofox]) - - -def flash_boot(serial: str, boot_data: bytes) -> None: - proper_flash(serial, "boot", boot_data) - - -def proper_flash(serial: str, part: str, data: bytes) -> None: - with open("image.img", "wb") as file: - file.write(data) - - with console.status(f"[cyan]Flashing {part}", spinner="line", spinner_style="white"): - fastboot_run(["flash", part, "image.img"], serial=serial) - - os.remove("image.img") - - -def restore_parts(serial: str) -> None: - gpt_both = Files.GPT_Both0.get() - userdata = Files.UserData_Empty.get() - proper_flash(serial, "partition:0", gpt_both) - proper_flash(serial, "userdata", userdata) - - -def clean_device(serial: str) -> None: - fastboot_run(["erase", "linux"], serial=serial) - fastboot_run(["erase", "esp"], serial=serial) - - def repartition(serial: str, size: int, percents=False) -> None: device = adbutils.adb.device(serial) block_size = device.shell("blockdev --getsize64 /dev/block/sda") @@ -156,7 +73,7 @@ def repartition(serial: str, size: int, percents=False) -> None: maxsize = 254 else: logger.error("Weird block size. Is it nabu?") - exit(4) + raise exceptions.RepartitonError() linux_max = maxsize - 12 if percents: size = round(linux_max / 100 * size, 2) @@ -175,8 +92,4 @@ def repartition(serial: str, size: int, percents=False) -> None: ] for cmd in cmds: device.shell(cmd) - sleep(3) - - -def wait_for_bootloader(serial: str) -> None: - fastboot_run(["getvar", "product"], serial=serial) + sleep(1)