# datalogging.py
# Contains the definitions related to the datalogger that are globals to all modules.
# Mainly what can be stored to the storage
#
# - License : MIT - See LICENSE file.
# - Project : Scrutiny Debugger (github.com/scrutinydebugger/scrutiny-main)
#
# Copyright (c) 2023 Scrutiny Debugger
__all__ = [
'AxisDefinition',
'DataSeries',
'DataSeriesWithAxis',
'DataloggingAcquisition',
'LoggedWatchable',
'DataloggingState'
]
import zlib
import struct
import enum
from uuid import uuid4
from dataclasses import dataclass
from datetime import datetime
import csv
import logging
from scrutiny.core.basic_types import WatchableType
from scrutiny.tools import validation
from scrutiny.tools.typing import *
if TYPE_CHECKING:
import _csv
[docs]
@dataclass(frozen=True)
class AxisDefinition:
"""(Immutable struct) Represent an axis"""
name: str
"""The name of the axis. Used for display"""
axis_id: int
"""A unique ID used to identify the axis"""
def __post_init__(self) -> None:
validation.assert_type(self.name, 'name', str)
validation.assert_type(self.axis_id, 'axis_id', int)
@dataclass(frozen=True)
class LoggedWatchable:
"""(Immutable struct) A structure that identifies a watchable element"""
path: str
"""The server path of the watchable monitored"""
type: WatchableType
"""The type of watchable"""
def __post_init__(self) -> None:
validation.assert_type(self.path, 'path', str)
validation.assert_type(self.type, 'type', WatchableType)
[docs]
class DataSeries:
"""A data series is a series of measurement represented by a series of 64 bits floating point value """
name: str
"""The name of the data series. Used for display"""
logged_watchable: Optional[LoggedWatchable]
"""The server element that was the source of the data. Can be variable, alias or RPV (Runtime Published Value)"""
data: List[float]
"""The data stored as a list of 64 bits float"""
def __init__(self, data: List[float] = [], name: str = "unnamed", logged_watchable: Optional[LoggedWatchable] = None):
self.name = name
self.logged_watchable = logged_watchable
self.data = data
validation.assert_type(data, 'data', list)
validation.assert_type(name, 'name', str)
validation.assert_type(logged_watchable, 'logged_watchable', (LoggedWatchable, type(None)))
def set_data(self, data: List[float]) -> None:
self.data = data
def set_data_binary(self, data: bytes) -> None:
if not isinstance(data, bytes):
raise ValueError('Data must be bytes')
data = zlib.decompress(data)
if len(data) % 8 != 0:
raise ValueError('Invalid byte stream')
nfloat = len(data) // 8
self.data = list(struct.unpack('>' + 'd' * nfloat, data))
def get_data(self) -> List[float]:
return self.data
def get_data_binary(self) -> bytes:
data = struct.pack('>' + 'd' * len(self.data), *self.data)
return zlib.compress(data)
def __len__(self) -> int:
return len(self.data)
[docs]
@dataclass(frozen=True)
class DataSeriesWithAxis:
"""(Immutable struct) Dataseries tied to an axis definition"""
series: DataSeries
"""The dataseries containing the acquisition data"""
axis: AxisDefinition
"""The Y-Axis to which the dataseries is bound to"""
def __post_init__(self) -> None:
validation.assert_type(self.series, 'series', DataSeries)
validation.assert_type(self.axis, 'axis', AxisDefinition)
[docs]
class DataloggingAcquisition:
"""Represent an acquisition of multiple signals"""
name: Optional[str]
"""A display name associated with the acquisition for easier management"""
reference_id: str
"""ID used to reference the acquisition in the storage"""
firmware_id: str
"""Firmware ID of the device on which the acquisition has been taken"""
acq_time: datetime
"""Time at which the acquisition has been taken"""
xdata: DataSeries
"""The series of data that represent the X-Axis"""
ydata: List[DataSeriesWithAxis]
"""List of data series acquired"""
trigger_index: Optional[int]
"""Sample index of the trigger"""
firmware_name: Optional[str]
"""The firmware name taken from the metadata of the SFD loaded when the acquisition was made. ``None`` if it is not available"""
def __init__(self,
firmware_id: str,
reference_id: Optional[str] = None,
acq_time: Optional[datetime] = None,
name: Optional[str] = None,
firmware_name: Optional[str] = None):
self.reference_id = reference_id if reference_id is not None else self.make_unique_id()
self.firmware_id = firmware_id
self.acq_time = datetime.now() if acq_time is None else acq_time
self.xdata = DataSeries()
self.name = name
self.ydata = []
self.trigger_index = None
self.firmware_name = firmware_name
@classmethod
def make_unique_id(self) -> str:
return uuid4().hex.replace('-', '')
def set_xdata(self, xdata: DataSeries) -> None:
if not isinstance(xdata, DataSeries):
raise TypeError('xdata must be a Dataseries instance')
self.xdata = xdata
def add_data(self, dataseries: DataSeries, axis: AxisDefinition) -> None:
if not isinstance(dataseries, DataSeries):
raise TypeError('dataseries must be a Dataseries instance')
if not isinstance(axis, AxisDefinition):
raise TypeError('axis must be a AxisDefinition instance')
if dataseries.logged_watchable is None:
raise ValueError("Y data must be tied to a watchable")
for data in self.ydata:
if data.axis.axis_id == axis.axis_id and data.axis is not axis:
raise ValueError("Two data series are using different Y-Axis with identical external ID.")
self.ydata.append(DataSeriesWithAxis(series=dataseries, axis=axis))
def get_data(self) -> List[DataSeriesWithAxis]:
return self.ydata
def get_unique_yaxis_list(self) -> List[AxisDefinition]:
yaxis = set()
for dataseries in self.ydata:
yaxis.add(dataseries.axis)
return list(yaxis)
def find_axis_for_dataseries(self, ds: DataSeries) -> AxisDefinition:
if not isinstance(ds, DataSeries):
raise TypeError('ds must be a DataSeries instance')
for a in self.ydata:
if a.series is ds:
return a.axis
raise LookupError("Cannot find axis for given dataseries")
def set_trigger_index(self, val: Optional[int]) -> None:
if val is not None:
if not isinstance(val, int):
raise ValueError("Trigger index must be an integer")
if val < 0:
raise ValueError("Trigger index must be a positive value")
if val >= len(self.xdata.get_data()):
raise ValueError("Trigger index cannot be greater than the x-axis data length")
self.trigger_index = val
def write_csv(self, writer: '_csv._writer') -> None:
firmware_name = 'N/A' if self.firmware_name is None else self.firmware_name
writer.writerow(['Acquisition Name', self.name])
writer.writerow(['Acquisition ID', self.reference_id])
writer.writerow(['Acquisition time', self.acq_time.strftime(r"%Y-%m-%d %H:%M:%S")])
writer.writerow(['Firmware ID', self.firmware_id])
writer.writerow(['Firmware Name', firmware_name])
writer.writerow([])
header_row = [self.xdata.name] + [ydata.series.name for ydata in self.ydata]
if self.trigger_index is not None:
header_row.append('Trigger')
writer.writerow(header_row)
for ydata in self.ydata:
if len(self.xdata.data) != len(ydata.series.data):
logging.error("Data of series %s does not have the same length as the X-Axis" % ydata.series.name)
for i in range(len(self.xdata.data)):
trigger_val = []
if self.trigger_index is not None:
trigger_val = [0 if i < self.trigger_index else 1]
writer.writerow([self.xdata.data[i]] + [ydata.series.data[i] for ydata in self.ydata] + trigger_val)
[docs]
def to_csv(self, filename: str) -> None:
"""Export a :class:`DataloggingAcquisition<scrutiny.core.datalogging.DataloggingAcquisition>` content to a csv file
:param filename: The file to write to
"""
with open(filename, 'w', encoding='utf8', newline='') as f:
writer = csv.writer(f, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
self.write_csv(writer)
[docs]
class DataloggingState(enum.Enum):
"""(Enum) The state in which the server datalogging manager currently is"""
NA = 0
"""The state is not available"""
Standby = 1
"""The datalogger is doing nothing"""
WaitForTrigger = 2
"""The datalogger is logging and actively monitor for the trigger condition to end the acquisition"""
Acquiring = 3
"""The datalogger is actively logging and the acquisition is ending since the trigger event has been fired"""
Downloading = 4
"""The datalogger has finished logging and data is being transferred to the server"""
Error = 5
"""The datalogger has encountered a problem and is not operational"""