Source code for scrutiny.core.datalogging

#    datalogging.py
#        Contains the definitions related to the datalogger that are globals to all modules.
#        Mainly what can be stored to the storage
#
#   - License : MIT - See LICENSE file
#   - Project : Scrutiny Debugger (github.com/scrutinydebugger/scrutiny-main)
#
#    Copyright (c) 2023 Scrutiny Debugger

__all__ = [
    'AxisDefinition',
    'DataSeries',
    'DataSeriesWithAxis',
    'DataloggingAcquisition',
    'LoggedWatchable',
    'DataloggingState'
]

import zlib
import struct
import enum
from uuid import uuid4
from dataclasses import dataclass
from datetime import datetime
import csv
import logging
from scrutiny.core.basic_types import WatchableType

from scrutiny.tools import validation
from scrutiny.tools.typing import *

if TYPE_CHECKING:
    import _csv


[docs] @dataclass(frozen=True, slots=True) class AxisDefinition: """(Immutable struct) Represent an axis""" name: str """The name of the axis. Used for display""" axis_id: int """A unique ID used to identify the axis""" def __post_init__(self) -> None: validation.assert_type(self.name, 'name', str) validation.assert_type(self.axis_id, 'axis_id', int)
@dataclass(frozen=True, slots=True) class LoggedWatchable: """(Immutable struct) A structure that identifies a watchable element""" path: str """The server path of the watchable monitored""" type: WatchableType """The type of watchable""" def __post_init__(self) -> None: validation.assert_type(self.path, 'path', str) validation.assert_type(self.type, 'type', WatchableType)
[docs] class DataSeries: """A data series is a series of measurement represented by a series of 64 bits floating point value """ name: str """The name of the data series. Used for display""" logged_watchable: Optional[LoggedWatchable] """The server element that was the source of the data. Can be variable, alias or RPV (Runtime Published Value)""" data: List[float] """The data stored as a list of 64 bits float""" def __init__(self, data: Optional[List[float]] = None, name: str = "unnamed", logged_watchable: Optional[LoggedWatchable] = None): self.name = name self.logged_watchable = logged_watchable self.data = data if data is not None else [] validation.assert_type(self.data, 'data', list) validation.assert_type(self.name, 'name', str) validation.assert_type(self.logged_watchable, 'logged_watchable', (LoggedWatchable, type(None))) def set_data(self, data: List[float]) -> None: self.data = data
[docs] def set_data_binary(self, data: bytes) -> None: """Set the data of the dataseries from a compressed bytestream (stored in a compressed storage) :raises ValueError: If ``data`` is not a ``bytes`` object or its decompressed length is not a multiple of 8. :raises zlib.error: If ``data`` is not a valid zlib-compressed stream. """ if not isinstance(data, bytes): raise ValueError('Data must be bytes') data = zlib.decompress(data) if len(data) % 8 != 0: # Data is a stream of 64bits float (python float) raise ValueError('Invalid byte stream') nfloat = len(data) // 8 self.data = list(struct.unpack('>' + 'd' * nfloat, data))
def get_data(self) -> List[float]: return self.data
[docs] def get_data_binary(self) -> bytes: """Get the content of the dataseries in a raw compressed bytestream, to be stored in a storage""" data = struct.pack('>' + 'd' * len(self.data), *self.data) return zlib.compress(data)
def __len__(self) -> int: return len(self.data)
[docs] @dataclass(frozen=True, slots=True) class DataSeriesWithAxis: """(Immutable struct) Dataseries tied to an axis definition""" series: DataSeries """The dataseries containing the acquisition data""" axis: AxisDefinition """The Y-Axis to which the dataseries is bound to""" def __post_init__(self) -> None: validation.assert_type(self.series, 'series', DataSeries) validation.assert_type(self.axis, 'axis', AxisDefinition)
[docs] class DataloggingAcquisition: """Represent an acquisition of multiple signals""" name: Optional[str] """A display name associated with the acquisition for easier management""" reference_id: str """ID used to reference the acquisition in the storage""" firmware_id: str """Firmware ID of the device on which the acquisition has been taken""" acq_time: datetime """Time at which the acquisition has been taken""" xdata: DataSeries """The series of data that represent the X-Axis""" ydata: List[DataSeriesWithAxis] """List of data series acquired""" trigger_index: Optional[int] """Sample index of the trigger""" firmware_name: Optional[str] """The firmware name taken from the metadata of the SFD loaded when the acquisition was made. ``None`` if it is not available""" def __init__(self, firmware_id: str, reference_id: Optional[str] = None, acq_time: Optional[datetime] = None, name: Optional[str] = None, firmware_name: Optional[str] = None): self.reference_id = reference_id if reference_id is not None else self.make_unique_id() self.firmware_id = firmware_id self.acq_time = datetime.now() if acq_time is None else acq_time self.xdata = DataSeries() self.name = name self.ydata = [] self.trigger_index = None self.firmware_name = firmware_name
[docs] @classmethod def make_unique_id(cls) -> str: """Generate a unique Id used as reference id""" return uuid4().hex.replace('-', '')
[docs] def set_xdata(self, xdata: DataSeries) -> None: """Define the dataseries that serves as the X-Axis :param xdata: The dataseries to use as X-Axis :raises TypeError: If ``xdata`` is not a :class:`DataSeries` instance. """ if not isinstance(xdata, DataSeries): raise TypeError('xdata must be a Dataseries instance') self.xdata = xdata
[docs] def add_data(self, dataseries: DataSeries, axis: AxisDefinition) -> None: """Add a dataseries do the acquisition :param dataseries: The dataseries to add :param axis: The Y-Axis on which to attach this dataseries. This can axis can be a new axis or already part of the acquisition :raises TypeError: If ``dataseries`` is not a :class:`DataSeries` instance or ``axis`` is not an :class:`AxisDefinition` instance. :raises ValueError: If ``dataseries`` ID is corrupted. """ if not isinstance(dataseries, DataSeries): raise TypeError('dataseries must be a Dataseries instance') if not isinstance(axis, AxisDefinition): raise TypeError('axis must be a AxisDefinition instance') if dataseries.logged_watchable is None: raise ValueError("Y data must be tied to a watchable") for data in self.ydata: if data.axis.axis_id == axis.axis_id and data.axis is not axis: raise ValueError("Two data series are using different Y-Axis with identical external ID.") self.ydata.append(DataSeriesWithAxis(series=dataseries, axis=axis))
[docs] def get_data(self) -> List[DataSeriesWithAxis]: """Return the Y data of the acquisition. Returned value is a list of dataseries paired with their Y-axis""" return self.ydata
[docs] def get_unique_yaxis_list(self) -> List[AxisDefinition]: """Return the Y-Axes used in this acquisition. No duplicate""" yaxis = set() for dataseries in self.ydata: yaxis.add(dataseries.axis) return list(yaxis)
[docs] def find_axis_for_dataseries(self, dataseries: DataSeries) -> AxisDefinition: """Return the Y axis on which the given dataseries is bound to. :param dataseries: The dataseries to lookup :raises LookupError: If the given dataseries is not part of this acquisition :raises TypeError: If dataseries is not of the right type """ if not isinstance(dataseries, DataSeries): raise TypeError('dataseries must be a DataSeries instance') for a in self.ydata: if a.series is dataseries: return a.axis raise LookupError("Cannot find axis for given dataseries")
[docs] def set_trigger_index(self, val: Optional[int]) -> None: """Define at what sample the Trigger even has fired :raises ValueError: If ``val`` is not aa positive integer ranging from 0 to nb_points - 1. """ if val is not None: if not isinstance(val, int): raise ValueError("Trigger index must be an integer") if val < 0: raise ValueError("Trigger index must be a positive value") if val >= len(self.xdata.get_data()): raise ValueError("Trigger index cannot be greater than the x-axis data length") self.trigger_index = val
[docs] def write_csv(self, writer: '_csv._writer') -> None: """Export the acquisition to a CSV file :param writer: The CSV writer to use """ firmware_name = 'N/A' if self.firmware_name is None else self.firmware_name writer.writerow(['Acquisition Name', self.name]) writer.writerow(['Acquisition ID', self.reference_id]) writer.writerow(['Acquisition time', self.acq_time.strftime(r"%Y-%m-%d %H:%M:%S")]) writer.writerow(['Firmware ID', self.firmware_id]) writer.writerow(['Firmware Name', firmware_name]) writer.writerow([]) header_row = [self.xdata.name] + [ydata.series.name for ydata in self.ydata] if self.trigger_index is not None: header_row.append('Trigger') writer.writerow(header_row) for ydata in self.ydata: if len(self.xdata.data) != len(ydata.series.data): logging.error("Data of series %s does not have the same length as the X-Axis" % ydata.series.name) for i in range(len(self.xdata.data)): trigger_val = [] if self.trigger_index is not None: trigger_val = [0 if i < self.trigger_index else 1] writer.writerow([self.xdata.data[i]] + [ydata.series.data[i] if i <= len(ydata.series.data) else None for ydata in self.ydata] + trigger_val)
[docs] def to_csv(self, filename: str) -> None: """Export a :class:`DataloggingAcquisition<scrutiny.core.datalogging.DataloggingAcquisition>` content to a csv file :param filename: The file to write to :raises OSError: If the file cannot be opened or written to. """ with open(filename, 'w', encoding='utf8', newline='') as f: writer = csv.writer(f, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL) self.write_csv(writer)
[docs] class DataloggingState(enum.Enum): """(Enum) The state in which the server datalogging manager currently is""" NA = 0 """The state is not available""" Standby = 1 """The datalogger is doing nothing""" WaitForTrigger = 2 """The datalogger is logging and actively monitor for the trigger condition to end the acquisition""" Acquiring = 3 """The datalogger is actively logging and the acquisition is ending since the trigger event has been fired""" Downloading = 4 """The datalogger has finished logging and data is being transferred to the server""" Error = 5 """The datalogger has encountered a problem and is not operational"""