# firmware_description.py
# Contains the class that represent a Scrutiny Firmware Description file.
# A .sfd is a file that holds all the data related to a firmware and is identified
# by a unique ID.
#
# - License : MIT - See LICENSE file
# - Project : Scrutiny Debugger (github.com/scrutinydebugger/scrutiny-main)
#
# Copyright (c) 2022 Scrutiny Debugger
__all__ = ['SFDGenerationInfo', 'SFDMetadata', 'FirmwareDescription', 'GenerationInfoTypedDict', 'MetadataTypedDict']
import zipfile
import os
import json
import logging
import platform
from datetime import datetime
from dataclasses import dataclass
import scrutiny
import scrutiny.core.firmware_id as firmware_id
from scrutiny.core.varmap import VarMap
from scrutiny.core import path_tools
from scrutiny.core.variable import Variable
from scrutiny.core.variable_location import AbsoluteLocation
from scrutiny.core.variable_factory import VariableFactory
from scrutiny.core.basic_types import WatchableType
from scrutiny.core.alias import Alias
from scrutiny.core.basic_types import *
from scrutiny.tools.typing import *
from scrutiny.tools import validation
@dataclass(slots=True)
class VarmapElement:
path: str
var_or_factory: Union[Variable, VariableFactory]
pointer_path_and_var: Optional[Tuple[str, Variable]]
class GenerationInfoTypedDict(TypedDict, total=False):
"""
Metadata about the environment of the file creator
"""
time: Optional[int]
python_version: Optional[str]
scrutiny_version: Optional[str]
system_type: Optional[str]
class MetadataTypedDict(TypedDict, total=False):
"""
Firmware Description metadata. Used for display in the UI (Communicated through API)
"""
project_name: Optional[str]
author: Optional[str]
version: Optional[str]
generation_info: Optional[GenerationInfoTypedDict]
[docs]
@dataclass(frozen=True, slots=True)
class SFDGenerationInfo:
"""(Immutable struct) Metadata relative to the generation of the SFD"""
timestamp: Optional[datetime]
"""Date/time at which the SFD has been created. ``None`` if not available"""
python_version: Optional[str]
"""Python version with which the SFD has been created. ``None`` if not available"""
scrutiny_version: Optional[str]
"""Scrutiny version with which the SFD has been created. ``None`` if not available"""
system_type: Optional[str]
"""Type of system on which the SFD has been created. Value given by Python `platform.system()`. ``None`` if not available"""
@classmethod
def make(cls) -> "SFDGenerationInfo":
return SFDGenerationInfo(
timestamp=datetime.now(),
python_version=platform.python_version(),
scrutiny_version=scrutiny.__version__,
system_type=platform.system()
)
def __post_init__(self) -> None:
validation.assert_type_or_none(self.timestamp, 'timestamp', datetime)
validation.assert_type_or_none(self.python_version, 'python_version', str)
validation.assert_type_or_none(self.scrutiny_version, 'scrutiny_version', str)
validation.assert_type_or_none(self.system_type, 'system_type', str)
def to_dict(self) -> GenerationInfoTypedDict:
timestamp = None
if self.timestamp is not None:
timestamp = int(round(self.timestamp.timestamp()))
return {
'python_version': self.python_version,
'scrutiny_version': self.scrutiny_version,
'system_type': self.system_type,
'time': timestamp
}
class FirmwareDescription:
"""
Scrutiny Firmware Description (SFD) is an object that contains all the relevant data about a firmware.
It mainly knows its firmware ID and the list of variables with their location.
Upon connection with a device, the correct SFD must be loaded, found with the firmware ID
"""
COMPRESSION_TYPE = zipfile.ZIP_DEFLATED
VARMAP_FILENAME: str = 'varmap.json'
METADATA_FILENAME: str = 'metadata.json'
FIRMWAREID_FILENAME: str = 'firmwareid'
ALIAS_FILE: str = 'alias.json'
REQUIRED_FILES: List[str] = [
FIRMWAREID_FILENAME,
METADATA_FILENAME,
VARMAP_FILENAME
]
varmap: VarMap
metadata: SFDMetadata
firmwareid: bytes
aliases: Dict[str, Alias]
logger: logging.Logger = logging.getLogger(__name__)
def __init__(self, firmwareid: bytes, varmap: VarMap, metadata: SFDMetadata):
self.firmwareid = firmwareid
self.varmap = varmap
self.metadata = metadata
self.aliases = {}
@classmethod
def load_from_filesystem(cls, file_folder: str) -> "FirmwareDescription":
"""Load a SFD from the filesystem. Can load a compressed .sfd file or a decompressed folder with all the content in it
:param file_folder: Path to the .sfd file or the decompressed folder
"""
if os.path.isdir(file_folder):
return cls.load_from_folder(file_folder)
elif os.path.isfile(file_folder):
return cls.load_from_file(file_folder)
raise FileNotFoundError(f"Cannot find {file_folder}")
@classmethod
def load_from_folder(cls, folder: str) -> "FirmwareDescription":
"""
Reads a folder just like if it was an unzipped Scrutiny Firmware Description (SFD) file.
Used to build the SFD
"""
if not os.path.isdir(folder):
raise FileNotFoundError("Folder %s does not exist" % folder)
for file in cls.REQUIRED_FILES:
if not os.path.isfile(os.path.join(folder, file)):
raise FileNotFoundError('Missing %s' % file)
metadata_file = os.path.join(folder, cls.METADATA_FILENAME)
with open(metadata_file, 'rb') as f:
metadata = cls.read_metadata(f)
with open(os.path.join(folder, cls.FIRMWAREID_FILENAME), 'rb') as f:
firmwareid = cls.read_firmware_id(f)
varmap = cls.read_varmap_from_filesystem(folder)
sfd = FirmwareDescription(firmwareid, varmap, metadata)
if os.path.isfile(os.path.join(folder, cls.ALIAS_FILE)):
with open(os.path.join(folder, cls.ALIAS_FILE), 'rb') as f:
aliases = cls.read_aliases(f, sfd.varmap, ignore_errors=True)
sfd.append_aliases(aliases.values())
return sfd
@classmethod
def read_metadata_from_sfd_file(cls, filename: str) -> SFDMetadata:
"""Reads the metadata from a .sfd file without parsing the whole file"""
with zipfile.ZipFile(filename, mode='r', compression=cls.COMPRESSION_TYPE) as sfd:
with sfd.open(cls.METADATA_FILENAME) as f:
metadata = cls.read_metadata(f)
return metadata
@classmethod
def load_from_file(cls, filename: str) -> "FirmwareDescription":
"""Reads a Scrutiny Firmware Description file (.sfd) which is just a .zip containing bunch of json files """
with zipfile.ZipFile(filename, mode='r', compression=cls.COMPRESSION_TYPE) as zipsfd:
with zipsfd.open(cls.FIRMWAREID_FILENAME) as f:
firmwareid = cls.read_firmware_id(f) # This is not a Json file. Content is raw.
with zipsfd.open(cls.METADATA_FILENAME, 'r') as f:
metadata = cls.read_metadata(f) # This is a json file
with zipsfd.open(cls.VARMAP_FILENAME, 'r') as f:
varmap = VarMap.from_file_content(f.read()) # Json file
sfd = FirmwareDescription(firmwareid, varmap, metadata)
if cls.ALIAS_FILE in zipsfd.namelist():
with zipsfd.open(cls.ALIAS_FILE, 'r') as f:
sfd.append_aliases(cls.read_aliases(f, varmap, ignore_errors=True).values())
return sfd
@classmethod
def read_firmware_id_from_sfd_file(cls, filename: str) -> bytes:
"""Read the firmware ID from a .sfd file without parsing the whole file"""
with zipfile.ZipFile(filename, mode='r', compression=cls.COMPRESSION_TYPE) as sfd:
with sfd.open(cls.FIRMWAREID_FILENAME) as f:
return cls.read_firmware_id(f)
@classmethod
def read_firmware_id(cls, f: IO[bytes]) -> bytes:
"""Reads the firmware ID from a ``firmwareid`` file"""
return bytes.fromhex(f.read().decode('ascii'))
@classmethod
def read_metadata(cls, f: IO[bytes]) -> SFDMetadata:
"""Reads the metadata from a ``metadata.json`` file"""
FIELDS_TYPE: TypeAlias = List[Tuple[str, Type[Any]]]
metadata_dict = cast(MetadataTypedDict, json.loads(f.read().decode('utf8')))
if not isinstance(metadata_dict, dict):
metadata_dict = {}
def remove_bad_fields(obj: Any, fields: FIELDS_TYPE) -> None:
obj2 = cast(Dict[str, Any], obj)
for field in fields:
if field[0] in obj2 and not isinstance(obj2[field[0]], field[1]):
del obj2[field[0]]
fields1: FIELDS_TYPE = [
('project_name', str),
('author', str),
('version', str),
('generation_info', dict)
]
remove_bad_fields(metadata_dict, fields1)
if 'generation_info' in metadata_dict:
fields2: FIELDS_TYPE = [
('python_version', str),
('scrutiny_version', str),
('system_type', str),
('time', int)
]
remove_bad_fields(metadata_dict['generation_info'], fields2)
generation_timestamp = None
generation_info = cast(Optional[GenerationInfoTypedDict], metadata_dict.get('generation_info', {}))
if generation_info is None:
generation_info = cast(GenerationInfoTypedDict, {})
generation_timestamp = generation_info.get('time', None)
return SFDMetadata(
author=metadata_dict.get('author', None),
project_name=metadata_dict.get('project_name', None),
version=metadata_dict.get('version', None),
generation_info=SFDGenerationInfo(
python_version=generation_info.get('python_version', None),
scrutiny_version=generation_info.get('scrutiny_version', None),
system_type=generation_info.get('system_type', None),
timestamp=None if generation_timestamp is None else datetime.fromtimestamp(generation_timestamp),
)
)
@classmethod
def read_aliases(cls, f: IO[bytes], varmap: VarMap, ignore_errors: bool = True) -> Dict[str, Alias]:
"""Reads the aliases from a ``alias.json`` file.
:param f: The file handle to the alias file
:param varmap: A VarMap to validate the alias paths
:param ignore_errors: When ``True``, load the aliases even if they point to a variable that does not exist.
"""
aliases_raw: Dict[str, Any] = json.loads(f.read().decode('utf8'))
aliases: Dict[str, Alias] = {}
for k in aliases_raw:
alias = Alias.from_dict(k, aliases_raw[k])
try:
alias.set_target_type(cls.get_alias_target_type(alias, varmap))
except Exception as e:
if ignore_errors:
cls.logger.error("Cannot read alias. %s" % str(e))
else:
raise e
aliases[k] = alias
return aliases
@classmethod
def get_alias_target_type(cls, alias: Alias, varmap: VarMap) -> WatchableType:
""" Finds the referred entry and gives this datatype. Alias do not have a datatype by themselves """
if varmap.has_var(alias.get_target()):
return WatchableType.Variable
elif path_tools.is_rpv_path(alias.get_target()):
return WatchableType.RuntimePublishedValue
else:
raise Exception('Alias %s is referencing %s which is not a valid Variable or Runtime Published Value' %
(alias.get_fullpath(), alias.get_target()))
@classmethod
def read_varmap_from_filesystem(cls, path: str) -> VarMap:
"""Reads the VarMap from either a ``.json`` file or from a folder (will look for a file named ``varmap.json``)"""
if os.path.isfile(path):
fullpath = path
elif os.path.isdir(path):
fullpath = os.path.join(path, cls.VARMAP_FILENAME)
else:
raise Exception('Cannot find varmap file at %s' % path)
return VarMap.from_file(fullpath)
def append_aliases(self, aliases: Iterable[Alias]) -> None:
"""Add some aliases to this Firmware description"""
for alias in aliases:
if alias.target_type is None:
alias.set_target_type(self.get_alias_target_type(alias, self.varmap))
if alias.get_fullpath() not in self.aliases:
self.aliases[alias.get_fullpath()] = alias
else:
self.logger.warning(f'Duplicate alias {alias.get_fullpath()}. Dropping')
def write(self, filename: str) -> None:
"""SFD file format is just a .zip with a bunch of JSON (and a firmwareid file)"""
with zipfile.ZipFile(filename, mode='w', compression=self.COMPRESSION_TYPE) as outzip:
outzip.writestr(self.FIRMWAREID_FILENAME, self.firmwareid.hex())
outzip.writestr(self.METADATA_FILENAME, json.dumps(self.metadata.to_dict(), indent=4))
outzip.writestr(self.VARMAP_FILENAME, self.varmap.get_json())
outzip.writestr(self.ALIAS_FILE, self.serialize_aliases(list(self.aliases.values())))
@classmethod
def serialize_aliases(cls, aliases_input: Union[Dict[str, Alias], List[Alias]]) -> bytes:
"""
Takes bunch of alias and return a JSON containing a dict structure like this
[alias1.fullpath] => alias1, [alias2.fullpath] => alias2
"""
aliases: Iterable[Alias]
if isinstance(aliases_input, dict):
aliases = aliases_input.values()
else:
aliases = aliases_input
dic = {alias.get_fullpath(): alias.to_dict() for alias in aliases}
return json.dumps(dic, indent=4).encode('utf8')
def get_firmware_id(self) -> bytes:
"""Return the Firwmare ID of this FirmwareDescription"""
return self.firmwareid
def get_firmware_id_ascii(self) -> str:
"""Return the Firwmare ID of this FirmwareDescription as a ASCII format"""
return self.firmwareid.hex().lower()
def get_endianness(self) -> Endianness:
"""Get the endianness of this FirmwareDescription"""
return self.varmap.get_endianness()
def validate(self) -> None:
if not hasattr(self, 'metadata') or not hasattr(self, 'varmap') or not hasattr(self, 'firmwareid'):
raise Exception('Firmware Description not loaded correctly')
self.validate_metadata()
self.validate_firmware_id()
self.varmap.validate()
def validate_firmware_id(self) -> None:
"""Expects a Firmware ID to have the same length as the default placeholder"""
if len(self.firmwareid) != self.firmware_id_length():
raise Exception('Firmware ID seems to be the wrong length. Found %d bytes, expected %d bytes' %
(len(self.firmwareid), self.firmware_id_length()))
def validate_metadata(self) -> None:
"""Check the content of te metadata and raise warnings if something seems wrong"""
if self.metadata.project_name is None:
self.logger.warning(f'No valid project name defined in {self.METADATA_FILENAME}')
if self.metadata.version is None:
self.logger.warning(f'No valid version defined in {self.METADATA_FILENAME}')
if self.metadata.author is None:
self.logger.warning(f'No valid author defined in {self.METADATA_FILENAME}')
def get_vars_for_datastore(self) -> Generator[VarmapElement, None, None]:
"""Returns all variables in this SFD with a Generator to avoid consuming memory."""
# We start by returning the absolute addressable variables first.
# This allow the consumer to know about pointers before receiving pointed engtries
for path, var_or_factory in self.varmap.iterate_vars([VarMap.LocationType.ABSOLUTE]):
yield VarmapElement(
path=path,
var_or_factory=var_or_factory,
pointer_path_and_var=None
)
# Now that absolute address variables are given (including pointers)
# we can give the pointed variables
for path, var_or_factory in self.varmap.iterate_vars([VarMap.LocationType.POINTED]):
if isinstance(var_or_factory, Variable):
assert not isinstance(var_or_factory.location, AbsoluteLocation)
ptr_path = var_or_factory.location.pointer_path
if self.varmap.has_var(ptr_path):
yield VarmapElement(
path=path,
var_or_factory=var_or_factory,
pointer_path_and_var=(ptr_path, self.varmap.get_var(ptr_path))
)
else:
yield VarmapElement(
path=path,
var_or_factory=var_or_factory,
pointer_path_and_var=None
)
def get_aliases_for_datastore(self, entry_type: Optional[WatchableType] = None) -> Generator[Tuple[str, Alias], None, None]:
"""Returns all alias in this SFD with a Generator to avoid consuming memory."""
for k in self.aliases:
if entry_type is None or self.aliases[k].get_target_type() == entry_type:
yield (self.aliases[k].get_fullpath(), self.aliases[k])
def get_aliases(self) -> Dict[str, Alias]:
"""Return all the aliases in this FirmwareDescription, indexed by their fullname"""
return self.aliases
def get_metadata(self) -> SFDMetadata:
"""Get the metadata of this FirmwareDescription"""
return self.metadata
@classmethod
def firmware_id_length(cls) -> int:
"""Get the length in bytes of a firmware ID"""
return len(firmware_id.PLACEHOLDER)