# datalogging.py
# Contains the definitions related to the datalogger that are globals to all modules.
# Mainly what can be stored to the storage
#
# - License : MIT - See LICENSE file
# - Project : Scrutiny Debugger (github.com/scrutinydebugger/scrutiny-main)
#
# Copyright (c) 2023 Scrutiny Debugger
__all__ = [
'AxisDefinition',
'DataSeries',
'DataSeriesWithAxis',
'DataloggingAcquisition',
'LoggedWatchable',
'DataloggingState'
]
import zlib
import struct
import enum
from uuid import uuid4
from dataclasses import dataclass
from datetime import datetime
import csv
import logging
from scrutiny.core.basic_types import WatchableType
from scrutiny.tools import validation
from scrutiny.tools.typing import *
if TYPE_CHECKING:
import _csv
[docs]
@dataclass(frozen=True, slots=True)
class AxisDefinition:
"""(Immutable struct) Represent an axis"""
name: str
"""The name of the axis. Used for display"""
axis_id: int
"""A unique ID used to identify the axis"""
def __post_init__(self) -> None:
validation.assert_type(self.name, 'name', str)
validation.assert_type(self.axis_id, 'axis_id', int)
@dataclass(frozen=True, slots=True)
class LoggedWatchable:
"""(Immutable struct) A structure that identifies a watchable element"""
path: str
"""The server path of the watchable monitored"""
type: WatchableType
"""The type of watchable"""
def __post_init__(self) -> None:
validation.assert_type(self.path, 'path', str)
validation.assert_type(self.type, 'type', WatchableType)
[docs]
class DataSeries:
"""A data series is a series of measurement represented by a series of 64 bits floating point value """
name: str
"""The name of the data series. Used for display"""
logged_watchable: Optional[LoggedWatchable]
"""The server element that was the source of the data. Can be variable, alias or RPV (Runtime Published Value)"""
data: List[float]
"""The data stored as a list of 64 bits float"""
def __init__(self, data: Optional[List[float]] = None, name: str = "unnamed", logged_watchable: Optional[LoggedWatchable] = None):
self.name = name
self.logged_watchable = logged_watchable
self.data = data if data is not None else []
validation.assert_type(self.data, 'data', list)
validation.assert_type(self.name, 'name', str)
validation.assert_type(self.logged_watchable, 'logged_watchable', (LoggedWatchable, type(None)))
def set_data(self, data: List[float]) -> None:
self.data = data
[docs]
def set_data_binary(self, data: bytes) -> None:
"""Set the data of the dataseries from a compressed bytestream (stored in a compressed storage)
:raises ValueError: If ``data`` is not a ``bytes`` object or its decompressed length is not a multiple of 8.
:raises zlib.error: If ``data`` is not a valid zlib-compressed stream.
"""
if not isinstance(data, bytes):
raise ValueError('Data must be bytes')
data = zlib.decompress(data)
if len(data) % 8 != 0: # Data is a stream of 64bits float (python float)
raise ValueError('Invalid byte stream')
nfloat = len(data) // 8
self.data = list(struct.unpack('>' + 'd' * nfloat, data))
def get_data(self) -> List[float]:
return self.data
[docs]
def get_data_binary(self) -> bytes:
"""Get the content of the dataseries in a raw compressed bytestream, to be stored in a storage"""
data = struct.pack('>' + 'd' * len(self.data), *self.data)
return zlib.compress(data)
def __len__(self) -> int:
return len(self.data)
[docs]
@dataclass(frozen=True, slots=True)
class DataSeriesWithAxis:
"""(Immutable struct) Dataseries tied to an axis definition"""
series: DataSeries
"""The dataseries containing the acquisition data"""
axis: AxisDefinition
"""The Y-Axis to which the dataseries is bound to"""
def __post_init__(self) -> None:
validation.assert_type(self.series, 'series', DataSeries)
validation.assert_type(self.axis, 'axis', AxisDefinition)
[docs]
class DataloggingAcquisition:
"""Represent an acquisition of multiple signals"""
name: Optional[str]
"""A display name associated with the acquisition for easier management"""
reference_id: str
"""ID used to reference the acquisition in the storage"""
firmware_id: str
"""Firmware ID of the device on which the acquisition has been taken"""
acq_time: datetime
"""Time at which the acquisition has been taken"""
xdata: DataSeries
"""The series of data that represent the X-Axis"""
ydata: List[DataSeriesWithAxis]
"""List of data series acquired"""
trigger_index: Optional[int]
"""Sample index of the trigger"""
firmware_name: Optional[str]
"""The firmware name taken from the metadata of the SFD loaded when the acquisition was made. ``None`` if it is not available"""
def __init__(self,
firmware_id: str,
reference_id: Optional[str] = None,
acq_time: Optional[datetime] = None,
name: Optional[str] = None,
firmware_name: Optional[str] = None):
self.reference_id = reference_id if reference_id is not None else self.make_unique_id()
self.firmware_id = firmware_id
self.acq_time = datetime.now() if acq_time is None else acq_time
self.xdata = DataSeries()
self.name = name
self.ydata = []
self.trigger_index = None
self.firmware_name = firmware_name
[docs]
@classmethod
def make_unique_id(cls) -> str:
"""Generate a unique Id used as reference id"""
return uuid4().hex.replace('-', '')
[docs]
def set_xdata(self, xdata: DataSeries) -> None:
"""Define the dataseries that serves as the X-Axis
:param xdata: The dataseries to use as X-Axis
:raises TypeError: If ``xdata`` is not a :class:`DataSeries` instance.
"""
if not isinstance(xdata, DataSeries):
raise TypeError('xdata must be a Dataseries instance')
self.xdata = xdata
[docs]
def add_data(self, dataseries: DataSeries, axis: AxisDefinition) -> None:
"""Add a dataseries do the acquisition
:param dataseries: The dataseries to add
:param axis: The Y-Axis on which to attach this dataseries. This can axis can be a new axis or already part of the acquisition
:raises TypeError: If ``dataseries`` is not a :class:`DataSeries` instance or ``axis`` is not an :class:`AxisDefinition` instance.
:raises ValueError: If ``dataseries`` ID is corrupted.
"""
if not isinstance(dataseries, DataSeries):
raise TypeError('dataseries must be a Dataseries instance')
if not isinstance(axis, AxisDefinition):
raise TypeError('axis must be a AxisDefinition instance')
if dataseries.logged_watchable is None:
raise ValueError("Y data must be tied to a watchable")
for data in self.ydata:
if data.axis.axis_id == axis.axis_id and data.axis is not axis:
raise ValueError("Two data series are using different Y-Axis with identical external ID.")
self.ydata.append(DataSeriesWithAxis(series=dataseries, axis=axis))
[docs]
def get_data(self) -> List[DataSeriesWithAxis]:
"""Return the Y data of the acquisition. Returned value is a list of dataseries paired with their Y-axis"""
return self.ydata
[docs]
def get_unique_yaxis_list(self) -> List[AxisDefinition]:
"""Return the Y-Axes used in this acquisition. No duplicate"""
yaxis = set()
for dataseries in self.ydata:
yaxis.add(dataseries.axis)
return list(yaxis)
[docs]
def find_axis_for_dataseries(self, dataseries: DataSeries) -> AxisDefinition:
"""Return the Y axis on which the given dataseries is bound to.
:param dataseries: The dataseries to lookup
:raises LookupError: If the given dataseries is not part of this acquisition
:raises TypeError: If dataseries is not of the right type
"""
if not isinstance(dataseries, DataSeries):
raise TypeError('dataseries must be a DataSeries instance')
for a in self.ydata:
if a.series is dataseries:
return a.axis
raise LookupError("Cannot find axis for given dataseries")
[docs]
def set_trigger_index(self, val: Optional[int]) -> None:
"""Define at what sample the Trigger even has fired
:raises ValueError: If ``val`` is not aa positive integer ranging from 0 to nb_points - 1.
"""
if val is not None:
if not isinstance(val, int):
raise ValueError("Trigger index must be an integer")
if val < 0:
raise ValueError("Trigger index must be a positive value")
if val >= len(self.xdata.get_data()):
raise ValueError("Trigger index cannot be greater than the x-axis data length")
self.trigger_index = val
[docs]
def write_csv(self, writer: '_csv._writer') -> None:
"""Export the acquisition to a CSV file
:param writer: The CSV writer to use
"""
firmware_name = 'N/A' if self.firmware_name is None else self.firmware_name
writer.writerow(['Acquisition Name', self.name])
writer.writerow(['Acquisition ID', self.reference_id])
writer.writerow(['Acquisition time', self.acq_time.strftime(r"%Y-%m-%d %H:%M:%S")])
writer.writerow(['Firmware ID', self.firmware_id])
writer.writerow(['Firmware Name', firmware_name])
writer.writerow([])
header_row = [self.xdata.name] + [ydata.series.name for ydata in self.ydata]
if self.trigger_index is not None:
header_row.append('Trigger')
writer.writerow(header_row)
for ydata in self.ydata:
if len(self.xdata.data) != len(ydata.series.data):
logging.error("Data of series %s does not have the same length as the X-Axis" % ydata.series.name)
for i in range(len(self.xdata.data)):
trigger_val = []
if self.trigger_index is not None:
trigger_val = [0 if i < self.trigger_index else 1]
writer.writerow([self.xdata.data[i]] + [ydata.series.data[i] if i <=
len(ydata.series.data) else None for ydata in self.ydata] + trigger_val)
[docs]
def to_csv(self, filename: str) -> None:
"""Export a :class:`DataloggingAcquisition<scrutiny.core.datalogging.DataloggingAcquisition>` content to a csv file
:param filename: The file to write to
:raises OSError: If the file cannot be opened or written to.
"""
with open(filename, 'w', encoding='utf8', newline='') as f:
writer = csv.writer(f, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
self.write_csv(writer)
[docs]
class DataloggingState(enum.Enum):
"""(Enum) The state in which the server datalogging manager currently is"""
NA = 0
"""The state is not available"""
Standby = 1
"""The datalogger is doing nothing"""
WaitForTrigger = 2
"""The datalogger is logging and actively monitor for the trigger condition to end the acquisition"""
Acquiring = 3
"""The datalogger is actively logging and the acquisition is ending since the trigger event has been fired"""
Downloading = 4
"""The datalogger has finished logging and data is being transferred to the server"""
Error = 5
"""The datalogger has encountered a problem and is not operational"""