Source code for scrutiny.core.firmware_description

#    firmware_description.py
#        Contains the class that represent a Scrutiny Firmware Description file.
#        A .sfd is a file that holds all the data related to a firmware and is identified
#        by a unique ID.
#
#   - License : MIT - See LICENSE file
#   - Project : Scrutiny Debugger (github.com/scrutinydebugger/scrutiny-main)
#
#    Copyright (c) 2022 Scrutiny Debugger

__all__ = ['SFDGenerationInfo', 'SFDMetadata', 'FirmwareDescription', 'GenerationInfoTypedDict', 'MetadataTypedDict']

import zipfile
import os
import json
import logging
import platform
from datetime import datetime
from dataclasses import dataclass

import scrutiny
import scrutiny.core.firmware_id as firmware_id
from scrutiny.core.varmap import VarMap
from scrutiny.core import path_tools
from scrutiny.core.variable import Variable
from scrutiny.core.variable_location import AbsoluteLocation
from scrutiny.core.variable_factory import VariableFactory
from scrutiny.core.basic_types import WatchableType
from scrutiny.core.alias import Alias
from scrutiny.core.basic_types import *

from scrutiny.tools.typing import *
from scrutiny.tools import validation


@dataclass(slots=True)
class VarmapElement:
    path: str
    var_or_factory: Union[Variable, VariableFactory]
    pointer_path_and_var: Optional[Tuple[str, Variable]]


class GenerationInfoTypedDict(TypedDict, total=False):
    """
    Metadata about the environment of the file creator
    """
    time: Optional[int]
    python_version: Optional[str]
    scrutiny_version: Optional[str]
    system_type: Optional[str]


class MetadataTypedDict(TypedDict, total=False):
    """
    Firmware Description metadata. Used for display in the UI (Communicated through API)
    """
    project_name: Optional[str]
    author: Optional[str]
    version: Optional[str]
    generation_info: Optional[GenerationInfoTypedDict]


[docs] @dataclass(frozen=True, slots=True) class SFDGenerationInfo: """(Immutable struct) Metadata relative to the generation of the SFD""" timestamp: Optional[datetime] """Date/time at which the SFD has been created. ``None`` if not available""" python_version: Optional[str] """Python version with which the SFD has been created. ``None`` if not available""" scrutiny_version: Optional[str] """Scrutiny version with which the SFD has been created. ``None`` if not available""" system_type: Optional[str] """Type of system on which the SFD has been created. Value given by Python `platform.system()`. ``None`` if not available""" @classmethod def make(cls) -> "SFDGenerationInfo": return SFDGenerationInfo( timestamp=datetime.now(), python_version=platform.python_version(), scrutiny_version=scrutiny.__version__, system_type=platform.system() ) def __post_init__(self) -> None: validation.assert_type_or_none(self.timestamp, 'timestamp', datetime) validation.assert_type_or_none(self.python_version, 'python_version', str) validation.assert_type_or_none(self.scrutiny_version, 'scrutiny_version', str) validation.assert_type_or_none(self.system_type, 'system_type', str) def to_dict(self) -> GenerationInfoTypedDict: timestamp = None if self.timestamp is not None: timestamp = int(round(self.timestamp.timestamp())) return { 'python_version': self.python_version, 'scrutiny_version': self.scrutiny_version, 'system_type': self.system_type, 'time': timestamp }
[docs] @dataclass(frozen=True, slots=True) class SFDMetadata: """(Immutable struct) All the metadata associated with a Scrutiny Firmware Description""" project_name: Optional[str] """Name of the project. ``None`` if not available""" author: Optional[str] """The author of this firmware. ``None`` if not available""" version: Optional[str] """The version string of this firmware. ``None`` if not available""" generation_info: SFDGenerationInfo """Metadata regarding the creation environment of the SFD file.""" def __post_init__(self) -> None: validation.assert_type_or_none(self.project_name, 'project_name', str) validation.assert_type_or_none(self.author, 'author', str) validation.assert_type_or_none(self.version, 'version', str) validation.assert_type(self.generation_info, 'generation_info', SFDGenerationInfo) def to_dict(self) -> MetadataTypedDict: return { 'project_name': self.project_name, 'author': self.author, 'version': self.version, 'generation_info': self.generation_info.to_dict() }
class FirmwareDescription: """ Scrutiny Firmware Description (SFD) is an object that contains all the relevant data about a firmware. It mainly knows its firmware ID and the list of variables with their location. Upon connection with a device, the correct SFD must be loaded, found with the firmware ID """ COMPRESSION_TYPE = zipfile.ZIP_DEFLATED VARMAP_FILENAME: str = 'varmap.json' METADATA_FILENAME: str = 'metadata.json' FIRMWAREID_FILENAME: str = 'firmwareid' ALIAS_FILE: str = 'alias.json' REQUIRED_FILES: List[str] = [ FIRMWAREID_FILENAME, METADATA_FILENAME, VARMAP_FILENAME ] varmap: VarMap metadata: SFDMetadata firmwareid: bytes aliases: Dict[str, Alias] logger: logging.Logger = logging.getLogger(__name__) def __init__(self, firmwareid: bytes, varmap: VarMap, metadata: SFDMetadata): self.firmwareid = firmwareid self.varmap = varmap self.metadata = metadata self.aliases = {} @classmethod def load_from_filesystem(cls, file_folder: str) -> "FirmwareDescription": """Load a SFD from the filesystem. Can load a compressed .sfd file or a decompressed folder with all the content in it :param file_folder: Path to the .sfd file or the decompressed folder """ if os.path.isdir(file_folder): return cls.load_from_folder(file_folder) elif os.path.isfile(file_folder): return cls.load_from_file(file_folder) raise FileNotFoundError(f"Cannot find {file_folder}") @classmethod def load_from_folder(cls, folder: str) -> "FirmwareDescription": """ Reads a folder just like if it was an unzipped Scrutiny Firmware Description (SFD) file. Used to build the SFD """ if not os.path.isdir(folder): raise FileNotFoundError("Folder %s does not exist" % folder) for file in cls.REQUIRED_FILES: if not os.path.isfile(os.path.join(folder, file)): raise FileNotFoundError('Missing %s' % file) metadata_file = os.path.join(folder, cls.METADATA_FILENAME) with open(metadata_file, 'rb') as f: metadata = cls.read_metadata(f) with open(os.path.join(folder, cls.FIRMWAREID_FILENAME), 'rb') as f: firmwareid = cls.read_firmware_id(f) varmap = cls.read_varmap_from_filesystem(folder) sfd = FirmwareDescription(firmwareid, varmap, metadata) if os.path.isfile(os.path.join(folder, cls.ALIAS_FILE)): with open(os.path.join(folder, cls.ALIAS_FILE), 'rb') as f: aliases = cls.read_aliases(f, sfd.varmap, ignore_errors=True) sfd.append_aliases(aliases.values()) return sfd @classmethod def read_metadata_from_sfd_file(cls, filename: str) -> SFDMetadata: """Reads the metadata from a .sfd file without parsing the whole file""" with zipfile.ZipFile(filename, mode='r', compression=cls.COMPRESSION_TYPE) as sfd: with sfd.open(cls.METADATA_FILENAME) as f: metadata = cls.read_metadata(f) return metadata @classmethod def load_from_file(cls, filename: str) -> "FirmwareDescription": """Reads a Scrutiny Firmware Description file (.sfd) which is just a .zip containing bunch of json files """ with zipfile.ZipFile(filename, mode='r', compression=cls.COMPRESSION_TYPE) as zipsfd: with zipsfd.open(cls.FIRMWAREID_FILENAME) as f: firmwareid = cls.read_firmware_id(f) # This is not a Json file. Content is raw. with zipsfd.open(cls.METADATA_FILENAME, 'r') as f: metadata = cls.read_metadata(f) # This is a json file with zipsfd.open(cls.VARMAP_FILENAME, 'r') as f: varmap = VarMap.from_file_content(f.read()) # Json file sfd = FirmwareDescription(firmwareid, varmap, metadata) if cls.ALIAS_FILE in zipsfd.namelist(): with zipsfd.open(cls.ALIAS_FILE, 'r') as f: sfd.append_aliases(cls.read_aliases(f, varmap, ignore_errors=True).values()) return sfd @classmethod def read_firmware_id_from_sfd_file(cls, filename: str) -> bytes: """Read the firmware ID from a .sfd file without parsing the whole file""" with zipfile.ZipFile(filename, mode='r', compression=cls.COMPRESSION_TYPE) as sfd: with sfd.open(cls.FIRMWAREID_FILENAME) as f: return cls.read_firmware_id(f) @classmethod def read_firmware_id(cls, f: IO[bytes]) -> bytes: """Reads the firmware ID from a ``firmwareid`` file""" return bytes.fromhex(f.read().decode('ascii')) @classmethod def read_metadata(cls, f: IO[bytes]) -> SFDMetadata: """Reads the metadata from a ``metadata.json`` file""" FIELDS_TYPE: TypeAlias = List[Tuple[str, Type[Any]]] metadata_dict = cast(MetadataTypedDict, json.loads(f.read().decode('utf8'))) if not isinstance(metadata_dict, dict): metadata_dict = {} def remove_bad_fields(obj: Any, fields: FIELDS_TYPE) -> None: obj2 = cast(Dict[str, Any], obj) for field in fields: if field[0] in obj2 and not isinstance(obj2[field[0]], field[1]): del obj2[field[0]] fields1: FIELDS_TYPE = [ ('project_name', str), ('author', str), ('version', str), ('generation_info', dict) ] remove_bad_fields(metadata_dict, fields1) if 'generation_info' in metadata_dict: fields2: FIELDS_TYPE = [ ('python_version', str), ('scrutiny_version', str), ('system_type', str), ('time', int) ] remove_bad_fields(metadata_dict['generation_info'], fields2) generation_timestamp = None generation_info = cast(Optional[GenerationInfoTypedDict], metadata_dict.get('generation_info', {})) if generation_info is None: generation_info = cast(GenerationInfoTypedDict, {}) generation_timestamp = generation_info.get('time', None) return SFDMetadata( author=metadata_dict.get('author', None), project_name=metadata_dict.get('project_name', None), version=metadata_dict.get('version', None), generation_info=SFDGenerationInfo( python_version=generation_info.get('python_version', None), scrutiny_version=generation_info.get('scrutiny_version', None), system_type=generation_info.get('system_type', None), timestamp=None if generation_timestamp is None else datetime.fromtimestamp(generation_timestamp), ) ) @classmethod def read_aliases(cls, f: IO[bytes], varmap: VarMap, ignore_errors: bool = True) -> Dict[str, Alias]: """Reads the aliases from a ``alias.json`` file. :param f: The file handle to the alias file :param varmap: A VarMap to validate the alias paths :param ignore_errors: When ``True``, load the aliases even if they point to a variable that does not exist. """ aliases_raw: Dict[str, Any] = json.loads(f.read().decode('utf8')) aliases: Dict[str, Alias] = {} for k in aliases_raw: alias = Alias.from_dict(k, aliases_raw[k]) try: alias.set_target_type(cls.get_alias_target_type(alias, varmap)) except Exception as e: if ignore_errors: cls.logger.error("Cannot read alias. %s" % str(e)) else: raise e aliases[k] = alias return aliases @classmethod def get_alias_target_type(cls, alias: Alias, varmap: VarMap) -> WatchableType: """ Finds the referred entry and gives this datatype. Alias do not have a datatype by themselves """ if varmap.has_var(alias.get_target()): return WatchableType.Variable elif path_tools.is_rpv_path(alias.get_target()): return WatchableType.RuntimePublishedValue else: raise Exception('Alias %s is referencing %s which is not a valid Variable or Runtime Published Value' % (alias.get_fullpath(), alias.get_target())) @classmethod def read_varmap_from_filesystem(cls, path: str) -> VarMap: """Reads the VarMap from either a ``.json`` file or from a folder (will look for a file named ``varmap.json``)""" if os.path.isfile(path): fullpath = path elif os.path.isdir(path): fullpath = os.path.join(path, cls.VARMAP_FILENAME) else: raise Exception('Cannot find varmap file at %s' % path) return VarMap.from_file(fullpath) def append_aliases(self, aliases: Iterable[Alias]) -> None: """Add some aliases to this Firmware description""" for alias in aliases: if alias.target_type is None: alias.set_target_type(self.get_alias_target_type(alias, self.varmap)) if alias.get_fullpath() not in self.aliases: self.aliases[alias.get_fullpath()] = alias else: self.logger.warning(f'Duplicate alias {alias.get_fullpath()}. Dropping') def write(self, filename: str) -> None: """SFD file format is just a .zip with a bunch of JSON (and a firmwareid file)""" with zipfile.ZipFile(filename, mode='w', compression=self.COMPRESSION_TYPE) as outzip: outzip.writestr(self.FIRMWAREID_FILENAME, self.firmwareid.hex()) outzip.writestr(self.METADATA_FILENAME, json.dumps(self.metadata.to_dict(), indent=4)) outzip.writestr(self.VARMAP_FILENAME, self.varmap.get_json()) outzip.writestr(self.ALIAS_FILE, self.serialize_aliases(list(self.aliases.values()))) @classmethod def serialize_aliases(cls, aliases_input: Union[Dict[str, Alias], List[Alias]]) -> bytes: """ Takes bunch of alias and return a JSON containing a dict structure like this [alias1.fullpath] => alias1, [alias2.fullpath] => alias2 """ aliases: Iterable[Alias] if isinstance(aliases_input, dict): aliases = aliases_input.values() else: aliases = aliases_input dic = {alias.get_fullpath(): alias.to_dict() for alias in aliases} return json.dumps(dic, indent=4).encode('utf8') def get_firmware_id(self) -> bytes: """Return the Firwmare ID of this FirmwareDescription""" return self.firmwareid def get_firmware_id_ascii(self) -> str: """Return the Firwmare ID of this FirmwareDescription as a ASCII format""" return self.firmwareid.hex().lower() def get_endianness(self) -> Endianness: """Get the endianness of this FirmwareDescription""" return self.varmap.get_endianness() def validate(self) -> None: if not hasattr(self, 'metadata') or not hasattr(self, 'varmap') or not hasattr(self, 'firmwareid'): raise Exception('Firmware Description not loaded correctly') self.validate_metadata() self.validate_firmware_id() self.varmap.validate() def validate_firmware_id(self) -> None: """Expects a Firmware ID to have the same length as the default placeholder""" if len(self.firmwareid) != self.firmware_id_length(): raise Exception('Firmware ID seems to be the wrong length. Found %d bytes, expected %d bytes' % (len(self.firmwareid), self.firmware_id_length())) def validate_metadata(self) -> None: """Check the content of te metadata and raise warnings if something seems wrong""" if self.metadata.project_name is None: self.logger.warning(f'No valid project name defined in {self.METADATA_FILENAME}') if self.metadata.version is None: self.logger.warning(f'No valid version defined in {self.METADATA_FILENAME}') if self.metadata.author is None: self.logger.warning(f'No valid author defined in {self.METADATA_FILENAME}') def get_vars_for_datastore(self) -> Generator[VarmapElement, None, None]: """Returns all variables in this SFD with a Generator to avoid consuming memory.""" # We start by returning the absolute addressable variables first. # This allow the consumer to know about pointers before receiving pointed engtries for path, var_or_factory in self.varmap.iterate_vars([VarMap.LocationType.ABSOLUTE]): yield VarmapElement( path=path, var_or_factory=var_or_factory, pointer_path_and_var=None ) # Now that absolute address variables are given (including pointers) # we can give the pointed variables for path, var_or_factory in self.varmap.iterate_vars([VarMap.LocationType.POINTED]): if isinstance(var_or_factory, Variable): assert not isinstance(var_or_factory.location, AbsoluteLocation) ptr_path = var_or_factory.location.pointer_path if self.varmap.has_var(ptr_path): yield VarmapElement( path=path, var_or_factory=var_or_factory, pointer_path_and_var=(ptr_path, self.varmap.get_var(ptr_path)) ) else: yield VarmapElement( path=path, var_or_factory=var_or_factory, pointer_path_and_var=None ) def get_aliases_for_datastore(self, entry_type: Optional[WatchableType] = None) -> Generator[Tuple[str, Alias], None, None]: """Returns all alias in this SFD with a Generator to avoid consuming memory.""" for k in self.aliases: if entry_type is None or self.aliases[k].get_target_type() == entry_type: yield (self.aliases[k].get_fullpath(), self.aliases[k]) def get_aliases(self) -> Dict[str, Alias]: """Return all the aliases in this FirmwareDescription, indexed by their fullname""" return self.aliases def get_metadata(self) -> SFDMetadata: """Get the metadata of this FirmwareDescription""" return self.metadata @classmethod def firmware_id_length(cls) -> int: """Get the length in bytes of a firmware ID""" return len(firmware_id.PLACEHOLDER)