# client.py
# A client that can talk with the Scrutiny server
#
# - License : MIT - See LICENSE file
# - Project : Scrutiny Debugger (github.com/scrutinydebugger/scrutiny-main)
#
# Copyright (c) 2023 Scrutiny Debugger
__all__ = [
'ScrutinyClient',
'SFDUploadRequest',
'SFDDownloadRequest',
'WatchableListDownloadRequest',
'SFDDownloadRequest',
]
import scrutiny.sdk
import scrutiny.sdk.datalogging
from scrutiny.sdk.pending_request import PendingRequest
from scrutiny.tools import validation
sdk = scrutiny.sdk
from scrutiny.sdk import _api_parser as api_parser
from scrutiny.sdk.definitions import *
from scrutiny.sdk.watchable_handle import WatchableHandle
from scrutiny.sdk import listeners
from scrutiny.core.basic_types import *
from scrutiny.core.firmware_description import FirmwareDescription
from scrutiny.tools.timer import Timer
from scrutiny.sdk.write_request import WriteRequest
from scrutiny.server.api import typing as api_typing
from scrutiny.server.api import API
from scrutiny.server.api.tcp_client_handler import TCPClientHandler
from scrutiny.tools.stream_datagrams import StreamMaker, StreamParser
from scrutiny.tools.profiling import VariableRateExponentialAverager
from scrutiny.tools.timebase import RelativeTimebase
from scrutiny.tools.queue import ScrutinyQueue
from scrutiny import tools
import selectors
import struct
import os
import logging
from scrutiny.core.logging import DUMPDATA_LOGLEVEL
import traceback
import threading
import socket
import json
import time
import enum
from dataclasses import dataclass
from base64 import b64encode
import queue
import types
from datetime import datetime
from pathlib import Path
from scrutiny.tools.typing import *
from scrutiny import tools
class CallbackState(enum.Enum):
"""Possible completion states of an asynchronous API request callback"""
Pending = enum.auto()
OK = enum.auto()
TimedOut = enum.auto()
Cancelled = enum.auto()
ServerError = enum.auto()
CallbackError = enum.auto()
SimulatedError = enum.auto()
ApiResponseCallback = Callable[[CallbackState, Optional[api_typing.S2CMessage]], None]
NewDataCallback = Callable[[sdk.WatchableListContentPart, bool], None]
T = TypeVar('T')
class ApiResponseFuture:
"""A future-like object representing the pending completion of an asynchronous API request.
Created by :meth:`_register_callback<ScrutinyClient._register_callback>` and passed to the
caller of :meth:`_send<ScrutinyClient._send>`. The worker thread marks it complete when the
server responds or when the request times out.
"""
_state: CallbackState
"""Current completion state of the request"""
_reqid: int
"""Request ID this future is tracking"""
_processed_event: threading.Event
"""Threading event set when the future transitions out of the ``Pending`` state"""
_error: Optional[Exception]
"""Exception captured when the state is ``CallbackError`` or ``ServerError`."""
_default_wait_timeout: float
"""Default timeout in seconds used by :meth:`wait` when none is provided"""
def __init__(self, reqid: int, default_wait_timeout: float) -> None:
self._state = CallbackState.Pending
self._reqid = reqid
self._processed_event = threading.Event()
self._error = None
self._default_wait_timeout = default_wait_timeout
def _wt_mark_completed(self, new_state: CallbackState, error: Optional[Exception] = None) -> None:
"""Mark this future as completed with the given state. Called by the worker thread.
:param new_state: The final ``CallbackState`` to assign.
:param error: Optional exception to attach when the state indicates a failure.
"""
# No need for lock here. The state will change once.
# But be careful, this will be called by the sdk thread, not the user thread
self._error = error
self._state = new_state
self._processed_event.set()
def wait(self, timeout: Optional[float] = None) -> None:
"""Block until this future is marked as completed or ``timeout`` expires.
:param timeout: Maximum seconds to wait. Uses ``_default_wait_timeout`` when ``None``.
"""
# This will be called by the user thread
if timeout is None:
timeout = self._default_wait_timeout
self._processed_event.wait(timeout)
@property
def state(self) -> CallbackState:
"""The current completion state of the request"""
return self._state
@property
def error(self) -> Optional[Exception]:
"""The exception associated with the request, or ``None`` if no error occurred"""
return self._error
@property
def error_str(self) -> str:
"""Human-readable description of the error or status condition"""
if self._error is not None:
return str(self._error)
elif self._state == CallbackState.Pending:
return 'Request not sent yet'
elif self._state == CallbackState.Cancelled:
return 'Request cancelled'
elif self._state == CallbackState.TimedOut:
return 'Request timed out'
return ''
class CallbackStorageEntry:
"""Represent an entry in the registry of all active requests"""
_reqid: int
"""The request ID identifying this API request"""
_callback: ApiResponseCallback
"""Callable invoked with the final state and server response when the request completes"""
_future: ApiResponseFuture
"""Future object that will be marked complete when the server responds or a timeout occurs"""
_creation_timestamp_monotonic: float
"""Monotonic timestamp recorded when the entry was created, used for timeout tracking"""
_timeout: float
"""Maximum time in seconds to wait for a response before timing out"""
def __init__(self, reqid: int, callback: ApiResponseCallback, future: ApiResponseFuture, timeout: float):
self._reqid = reqid
self._callback = callback
self._future = future
self._creation_timestamp_monotonic = time.monotonic()
self._timeout = timeout
@dataclass(slots=True)
class PendingAPIBatchWrite:
"""Tracks a batch of write requests currently awaiting per-item completion confirmations from the server"""
update_dict: Dict[int, WriteRequest]
"""Mapping of batch index to ``WriteRequest`` objects still awaiting a write completion message"""
confirmation: api_parser.WriteConfirmation
"""The server's initial acknowledgment of the batch, containing the request token"""
creation_perf_timestamp: float
"""High-resolution timestamp when the batch was submitted, used to enforce ``timeout`."""
timeout: float
"""Maximum time in seconds for all items in the batch to receive completion confirmations"""
class BatchWriteContext:
"""Context manager for grouping multiple write requests into a single batch.
All writes made inside the ``with`` block are accumulated and submitted atomically
when the block exits without an exception. If the block raises, the batch is discarded.
"""
client: "ScrutinyClient"
"""The ``ScrutinyClient`` that owns this batch context"""
timeout: float
"""Maximum time in seconds to wait for all write requests to complete after flushing"""
requests: List[WriteRequest]
"""Write requests accumulated during the ``with`` block"""
def __init__(self, client: "ScrutinyClient", timeout: float) -> None:
self.client = client
self.timeout = timeout
self.requests = []
def __enter__(self) -> "BatchWriteContext":
"""Enter the context and return this instance"""
return self
def __exit__(self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[types.TracebackType]) -> Literal[False]:
"""Flush and wait for all accumulated write requests, then end the batch.
If an exception is propagating, the batch is cancelled instead of flushed.
:raises TimeoutException: If one or more write requests do not complete within the batch timeout.
:returns: ``False``, so any exception raised inside the block is propagated.
"""
if exc_type is None:
self.client._flush_batch_write(self)
try:
self.client._wait_write_batch_complete(self)
finally:
self.client._end_batch()
else:
self.client._end_batch()
return False
class FlushPoint:
"""Sentinel object inserted into the write queue to delimit the end of individual write requests
from an incoming batch, allowing the worker thread to recognize where the batch begins"""
pass
[docs]
@dataclass(init=False)
class WatchableListDownloadRequest(PendingRequest):
"""Represents a pending watchable download request. It can be used to wait for completion or cancel the request.
:param client: A reference to the client object
:param request_id: The request ID of the initiating request
:param new_data_callback: A callback to process the segmented data as it comes in. Called from the internal thread.
"""
_request_id: int
"""The request ID of the initiating ``get_watchable_list`` request"""
_buffered_content: sdk.WatchableListContentPart
"""Accumulated watchable definitions when no ``_new_data_callback`` is provided"""
_new_data_callback: Optional[NewDataCallback]
"""Optional callback invoked for each data segment received, called from the worker thread"""
def __init__(self,
client: "ScrutinyClient",
request_id: int,
new_data_callback: Optional[NewDataCallback] = None
) -> None:
self._request_id = request_id
self._buffered_content = sdk.WatchableListContentPart()
self._new_data_callback = new_data_callback
super().__init__(client)
def _add_data(self, data: sdk.WatchableListContentPart, done: bool) -> None:
"""Append a received data segment to the buffer, or pass it to the callback.
:param data: The chunk of watchable definitions received from the server.
:param done: ``True`` if this is the final segment of the download.
"""
if self._new_data_callback is not None:
self._new_data_callback(data, done)
else: # User has no callback to process it. Let's buffer the response for him
self._buffered_content.extend(data)
self._update_expiration_timer()
[docs]
def cancel(self) -> None:
"""Informs the client that this request can be canceled.
Subsequent server response will be ignored and this request will be marked as completed, but failed.
:raises TimeoutException: If the client fails to cancel the request. Should never happen
:raises OperationFailure: If called on a completed request
"""
self._client._cancel_download_watchable_list_request(self._request_id)
try:
self.wait_for_completion(2) # Expect to throw when the client mark us as fail.
except sdk.exceptions.OperationFailure:
pass
[docs]
def get(self) -> sdk.WatchableListContentPart:
"""
Returns the definition of all the watchables obtained through this request, classified by type.
:raises InvalidValueError: If the data is not available yet (if the request did not completed successfully)
:return: A dictionary of dictionary containing the definition of each watchable entry that matched the filters. `foo[type][server_path] = <definition>`
"""
if self._new_data_callback is not None:
raise sdk.exceptions.InvalidValueError("The watchable list is not stored when a callback is provided to process the partial responses.")
if not self.completed:
raise sdk.exceptions.InvalidValueError("Watchable list is not finished downloading")
if not self.is_success:
raise sdk.exceptions.InvalidValueError("Watchable list failed to download fully")
return self._buffered_content
[docs]
class SFDDownloadRequest(PendingRequest):
"""Represents a pending SFD (Scrutiny Firmware Description) file download request.
It can be used to wait for completion and get the file data.
"""
_firmware_id: str
"""Firmware identifier of the SFD file being downloaded"""
_buffer: bytearray
"""Accumulator for received SFD file data chunks"""
_reqid: int
"""Request ID assigned to this download operation"""
_expected_total_size: Optional[float]
"""Total byte count expected from the server, used for progress calculation. ``None`` until the first chunk arrives"""
def __init__(self, client: "ScrutinyClient", firmware_id: str, reqid: int) -> None:
super().__init__(client)
self._firmware_id = firmware_id
self._buffer = bytearray()
self._reqid = reqid
self._expected_total_size = None
def _set_expected_total_size(self, total: int) -> None:
"""Record the total expected file size reported by the server.
:param total: Total number of bytes in the SFD file.
"""
self._expected_total_size = total
def _add_data(self, data: bytes) -> None:
"""Append a received data chunk to the internal buffer and reset the expiration timer.
:param data: Raw bytes received from the server.
"""
self._buffer.extend(data)
self._update_expiration_timer()
@property
def received_count(self) -> int:
"""Amount of bytes received so far"""
return len(self._buffer)
@property
def firmware_id(self) -> str:
"""The firmware ID of the SFD being downloaded"""
return self._firmware_id
[docs]
def cancel(self) -> None:
"""Cancel the request and wake up any thread that called :meth:`wait_for_completion<wait_for_completion>`
:raises OperationFailure: If the request is no longer tracked by the client.
"""
self._client._cancel_download_sfd_request(self._reqid)
[docs]
def get(self) -> bytes:
"""Return the downloaded SFD (Scrutiny Firmware Description) data.
Data is available once ``is_success=True``. One can call :meth:`wait_for_completion<SFDDownloadRequest.wait_for_completion>`
to wait for all the data to be received.
:raises InvalidValueError: If the data is not fully downloaded.
"""
if not self.is_success:
raise sdk.exceptions.InvalidValueError("The content is not fully downloaded yet")
return bytes(self._buffer)
[docs]
def get_progress(self) -> float:
"""Returns a number between 0 and 1 indicating the download percentage being received"""
if self._expected_total_size is None:
return 0
if self._expected_total_size == 0:
return 0
ratio = self.received_count / self._expected_total_size
return min(1, max(0, ratio))
[docs]
class SFDUploadRequest(PendingRequest):
"""Represents a pending SFD (Scrutiny Firmware Description) file upload request. It can be used to wait for completion"""
_firmware_id: str
"""Firmware identifier for the SFD file being uploaded"""
_upload_token: str
"""Server-issued token for this upload session"""
_will_overwrite: bool
"""``True`` if the upload will replace an existing SFD with the same firmware ID"""
_started: bool
"""``True`` once data transmission to the server has been initiated"""
_filepath: Path
"""Absolute filesystem path of the SFD file being uploaded"""
_init_reqid: int
"""Request ID from the upload initialization exchange"""
_actual_size: int
"""Number of bytes the server has confirmed as received"""
_filesize: int
"""Total size of the file being uploaded in bytes"""
_sfd_info: Optional[sdk.SFDInfo]
"""SFD metadata returned by the server upon successful upload completion, or ``None`` while in progress"""
def __init__(self, client: "ScrutinyClient", init_reqid: int, firmware_id: str, upload_token: str, will_overwrite: bool, filepath: Path) -> None:
super().__init__(client)
self._firmware_id = firmware_id
self._init_reqid = init_reqid
self._upload_token = upload_token
self._will_overwrite = will_overwrite
self._started = False
self._filepath = Path(os.path.normpath(filepath)).absolute()
self._filesize = os.stat(self._filepath).st_size
self._actual_size = 0
self._sfd_info = None
def _set_actual_size(self, size: int) -> None:
"""Update the byte count acknowledged by the server and reset the expiration timer.
:param size: Number of bytes the server has confirmed as received so far.
"""
self._actual_size = size
self._update_expiration_timer()
def _set_sfd_info(self, sfd_info: sdk.SFDInfo) -> None:
"""Store the SFD metadata returned by the server after a successful upload.
:param sfd_info: The ``SFDInfo`` structure received from the server.
"""
self._sfd_info = sfd_info
@property
def firmware_id(self) -> str:
"""Firmware ID of the SFD being uploaded"""
return self._firmware_id
@property
def abs_filepath(self) -> str:
"""Gives the absolute path the file being uploaded"""
return str(self._filepath)
@property
def will_overwrite(self) -> bool:
"""Indicate if the request will overwrite an existing SFD installed on the server. """
return self._will_overwrite
[docs]
def get_sfd_info(self) -> sdk.SFDInfo:
"""Reads the :class:`SFDInfo<scrutiny.sdk.SFDInfo>` structure returned by the server after a successful upload.
:raises InvalidValueError: If called while the request is not successful
:return: The uploaded file :class:`SFDInfo<scrutiny.sdk.SFDInfo>`
"""
if self._sfd_info is None:
raise sdk.exceptions.InvalidValueError("The SFD info is not available")
return self._sfd_info
[docs]
def start(self) -> None:
"""Request the client to start uploading the file content.
:raises OperationFailure: If the upload request is no longer tracked by the client.
"""
if not self._started:
self._client._start_upload_sfd(self._init_reqid)
[docs]
def cancel(self) -> None:
"""Stop uploading to the server.
:raises OperationFailure: If the upload request is no longer tracked by the client.
"""
self._client._cancel_upload_sfd_request(self._init_reqid)
[docs]
def get_progress(self) -> float:
"""Returns a number between 0 and 1 indicating the upload percentage being acknowledged by the server"""
if self._filesize == 0:
return 0
if self.is_success:
return 1
ratio = self._actual_size / self._filesize
return min(1, max(0, ratio))
class DataRateMeasurements:
"""Tracks bidirectional data and message transmission rates using exponential averaging"""
rx_data_rate: VariableRateExponentialAverager
"""Exponential averager for incoming data rate in bytes per second"""
tx_data_rate: VariableRateExponentialAverager
"""Exponential averager for outgoing data rate in bytes per second"""
rx_message_rate: VariableRateExponentialAverager
"""Exponential averager for incoming message rate in messages per second"""
tx_message_rate: VariableRateExponentialAverager
"""Exponential averager for outgoing message rate in messages per second"""
def __init__(self) -> None:
self.rx_data_rate = VariableRateExponentialAverager(time_estimation_window=0.1, tau=0.5, near_zero=1)
self.tx_data_rate = VariableRateExponentialAverager(time_estimation_window=0.1, tau=0.5, near_zero=1)
self.rx_message_rate = VariableRateExponentialAverager(time_estimation_window=0.1, tau=0.5, near_zero=0.1)
self.tx_message_rate = VariableRateExponentialAverager(time_estimation_window=0.1, tau=0.5, near_zero=0.1)
def update(self) -> None:
"""Advance all rate averagers based on elapsed time"""
self.rx_data_rate.update()
self.tx_data_rate.update()
self.rx_message_rate.update()
self.tx_message_rate.update()
def enable(self) -> None:
"""Start tracking for all rate measurements"""
self.rx_data_rate.enable()
self.tx_data_rate.enable()
self.rx_message_rate.enable()
self.tx_message_rate.enable()
def disable(self) -> None:
"""Stop tracking for all rate measurements"""
self.rx_data_rate.disable()
self.tx_data_rate.disable()
self.rx_message_rate.disable()
self.tx_message_rate.disable()
def reset(self) -> None:
"""Clear all accumulated rate history"""
self.rx_data_rate.reset()
self.tx_data_rate.reset()
self.rx_message_rate.reset()
self.tx_message_rate.reset()
[docs]
class ScrutinyClient:
"""High-level client for communicating with a Scrutiny server over TCP.
Provides a thread-safe API to connect, watch variables, read/write memory,
trigger datalogging, manage SFD files, and receive asynchronous events.
A background worker thread handles all socket I/O and server message dispatch.
"""
RxMessageCallback = Callable[["ScrutinyClient", object], None]
"""Type alias for a callback invoked on every received server message. Mainly used for testing"""
_UPDATE_SERVER_STATUS_INTERVAL = 2
"""Interval in seconds between automatic server-status poll requests"""
_MAX_WRITE_REQUEST_BATCH_SIZE = 500
"""Maximum number of individual write entries that can be included in a single API batch"""
_MEMORY_READ_DATA_LIFETIME = 30
"""Seconds before a completed memory-read response is evicted from the deferred-response cache"""
_MEMORY_WRITE_DATA_LIFETIME = 30
"""Seconds before a completed memory-write response is evicted from the deferred-response cache"""
_DOWNLOAD_WATCHABLE_LIST_LIFETIME = 30
"""Seconds of inactivity before a pending watchable-list download is timed out"""
_DOWNLOAD_SFD_REQUEST_LIFETIME = 30
"""Seconds of inactivity before a pending SFD download request is timed out"""
_UPLOAD_SFD_REQUEST_LIFETIME = 30
"""Seconds of inactivity before a pending SFD upload request is timed out"""
_UNITTEST_DOWNLOAD_CHUNK_SIZE: Optional[int] = None
"""Override for the SFD upload chunk size used in unit tests. ``None`` uses the production default"""
@dataclass(frozen=True, slots=True)
class Statistics:
"""Performance metrics given by the client useful for diagnostic and debugging"""
rx_data_rate: float
"""Returns the approximated data input rate coming from the server in Bytes/sec"""
rx_message_rate: float
"""Returns the approximated message input rate coming from the server in msg/sec"""
tx_data_rate: float
"""Returns the approximated data output rate sent to the server in Bytes/sec"""
tx_message_rate: float
"""Returns the approximated message output rate sent to the server in msg/sec"""
class Events:
"""Container for all event types that ``ScrutinyClient`` can emit, along with their filter flag constants"""
@dataclass(frozen=True, slots=True)
class ConnectedEvent:
"""Triggered when the client connects to a Scrutiny server"""
_filter_flag = 0x01
"""Bitmask used to match this event type against the ``_enabled_events`` flags"""
host: str
"""The server hostname"""
port: int
"""The server port"""
def msg(self) -> str:
"""Return a human-readable description of this event"""
return f"Connected to a Scrutiny server at {self.host}:{self.port}"
@dataclass(frozen=True, slots=True)
class DisconnectedEvent:
"""Triggered when the client disconnects from a Scrutiny server"""
_filter_flag = 0x02
"""Bitmask used to match this event type against the ``_enabled_events`` flags"""
host: str
"""The server hostname"""
port: int
"""The server port"""
def msg(self) -> str:
"""Return a human-readable description of this event"""
return f"Disconnected from server at {self.host}:{self.port}"
@dataclass(frozen=True, slots=True)
class DeviceReadyEvent:
"""Triggered when the server establish a communication with a device and the handshake phase is completed"""
_filter_flag = 0x04
"""Bitmask used to match this event type against the ``_enabled_events`` flags"""
session_id: str
"""A unique ID assigned to the communication session. This ID will change if the same device disconnects and reconnects"""
def msg(self) -> str:
"""Return a human-readable description of this event"""
return f"A new device is connected and ready. Session ID: {self.session_id} "
@dataclass(frozen=True, slots=True)
class DeviceGoneEvent:
"""Triggered when the the communication between the server and a device stops"""
_filter_flag = 0x08
"""Bitmask used to match this event type against the ``_enabled_events`` flags"""
session_id: str
"""The unique ID assigned to the communication session"""
def msg(self) -> str:
"""Return a human-readable description of this event"""
return f"Device is gone. Last session ID: {self.session_id}"
@dataclass(frozen=True, slots=True)
class SFDLoadedEvent:
"""Triggered when the server loads a Scrutiny Firmware Description file, making Aliases and Variables available through the API """
_filter_flag = 0x10
"""Bitmask used to match this event type against the ``_enabled_events`` flags"""
firmware_id: str
"""The firmware ID that matches the SFD"""
def msg(self) -> str:
"""Return a human-readable description of this event"""
return f"Server has loaded a Firmware Description with firmware ID: {self.firmware_id}"
@dataclass(frozen=True, slots=True)
class SFDUnLoadedEvent:
"""Triggered when the server unloads a Scrutiny Firmware Description file"""
_filter_flag = 0x20
"""Bitmask used to match this event type against the ``_enabled_events`` flags"""
firmware_id: str
"""The firmware ID that matches the SFD"""
def msg(self) -> str:
"""Return a human-readable description of this event"""
return f"Server has unloaded a Firmware Description with firmware ID: {self.firmware_id}"
@dataclass(frozen=True, slots=True)
class DataloggingStateChanged:
"""Triggered when the server datalogging service changes state or when the acquisition/download completion ratio is updated"""
_filter_flag = 0x40
"""Bitmask used to match this event type against the ``_enabled_events`` flags"""
details: sdk.DataloggingInfo
"""The state of the datalogging service and the completion ratio"""
def msg(self) -> str:
"""Return a human-readable description of this event"""
msg = f"Datalogging state changed: {self.details.state.name}"
if self.details.completion_ratio is not None:
msg += f" ({round(self.details.completion_ratio * 100)}%)"
return msg
@dataclass(frozen=True, slots=True)
class StatusUpdateEvent:
"""Triggered when the a new server status is received"""
_filter_flag = 0x80
"""Bitmask used to match this event type against the ``_enabled_events`` flags"""
info: sdk.ServerInfo
"""The status info received"""
def msg(self) -> str:
"""Return a human-readable description of this event"""
return f"New server status update received"
@dataclass(frozen=True, slots=True)
class DataloggingListChanged:
"""Triggered when the list of datalogging acquisition changed on the server (new, removed or updated).
A call to :meth:`read_datalogging_acquisitions_metadata<ScrutinyClient.read_datalogging_acquisitions_metadata>`
can be used to fetch the details"""
_filter_flag = 0x100
"""Bitmask used to match this event type against the ``_enabled_events`` flags"""
change_type: DataloggingListChangeType
"""The action performed on the datalogging list. Useful to correctly update the client side list"""
acquisition_reference_id: Optional[str]
"""The targeted acquisition. Will have a value for NEW, DELETE, UPDATE. ``None`` for DELETE_ALL"""
def msg(self) -> str:
"""Return a human-readable description of this event"""
return f"List of datalogging acquisition has changed"
LISTEN_NONE = 0x0
"""Listen to no events"""
LISTEN_CONNECTED = ConnectedEvent._filter_flag
"""Listen for events of type :class:`ConnectedEvent<scrutiny.sdk.client.ScrutinyClient.Events.ConnectedEvent>`"""
LISTEN_DISCONNECTED = DisconnectedEvent._filter_flag
"""Listen for events of type :class:`DisconnectedEvent<scrutiny.sdk.client.ScrutinyClient.Events.DisconnectedEvent>`"""
LISTEN_DEVICE_READY = DeviceReadyEvent._filter_flag
"""Listen for events of type :class:`DeviceReadyEvent<scrutiny.sdk.client.ScrutinyClient.Events.DeviceReadyEvent>`"""
LISTEN_DEVICE_GONE = DeviceGoneEvent._filter_flag
"""Listen for events of type :class:`DeviceGoneEvent<scrutiny.sdk.client.ScrutinyClient.Events.DeviceGoneEvent>`"""
LISTEN_SFD_LOADED = SFDLoadedEvent._filter_flag
"""Listen for events of type :class:`SFDLoadedEvent<scrutiny.sdk.client.ScrutinyClient.Events.SFDLoadedEvent>`"""
LISTEN_SFD_UNLOADED = SFDUnLoadedEvent._filter_flag
"""Listen for events of type :class:`SFDUnLoadedEvent<scrutiny.sdk.client.ScrutinyClient.Events.SFDUnLoadedEvent>`"""
LISTEN_DATALOGGING_STATE_CHANGED = DataloggingStateChanged._filter_flag
"""Listen for events of type :class:`DataloggingStateChanged<scrutiny.sdk.client.ScrutinyClient.Events.DataloggingStateChanged>`"""
LISTEN_STATUS_UPDATE_CHANGED = StatusUpdateEvent._filter_flag
"""Listen for events of type :class:`StatusUpdateEvent<scrutiny.sdk.client.ScrutinyClient.Events.StatusUpdateEvent>`"""
LISTEN_DATALOGGING_LIST_CHANGED = DataloggingListChanged._filter_flag
"""Listen for events of type :class:`DataloggingListChanged<scrutiny.sdk.client.ScrutinyClient.Events.DataloggingListChanged>`"""
LISTEN_ALL = 0xFFFFFFFF
"""Listen to all events"""
_ANY_EVENTS = Union[
ConnectedEvent, DisconnectedEvent, DeviceReadyEvent, DeviceGoneEvent,
SFDLoadedEvent, SFDUnLoadedEvent, DataloggingStateChanged, StatusUpdateEvent,
DataloggingListChanged
]
@dataclass(slots=True)
class _ThreadingEvents:
"""Container for ``threading.Event`` objects used to coordinate between the user thread and the worker thread"""
stop_worker_thread: threading.Event
"""Signals the worker thread to exit its loop and terminate"""
disconnect: threading.Event
"""Signals the worker thread to perform a graceful disconnect"""
disconnected: threading.Event
"""Set by the worker thread after it has completed the disconnect sequence"""
msg_received: threading.Event
"""Set each time a server message is received; used for synchronization"""
server_status_updated: threading.Event
"""Set when a fresh server-status response has been processed"""
sync_complete: threading.Event
"""Set by the worker thread after completing a full processing iteration when ``require_sync`` was set"""
require_sync: threading.Event
"""Set by the user thread to request the worker thread to complete one full iteration"""
welcome_received: threading.Event
"""Set when the server's initial welcome message has been received and parsed"""
def __init__(self) -> None:
"""Initialize all threading events in their unset state"""
self.stop_worker_thread = threading.Event()
self.disconnect = threading.Event()
self.disconnected = threading.Event()
self.msg_received = threading.Event()
self.server_status_updated = threading.Event()
self.sync_complete = threading.Event()
self.require_sync = threading.Event()
self.welcome_received = threading.Event()
_name: Optional[str]
"""Optional human-readable name for this client instance, used in log messages"""
_server_state: ServerState
"""Current communication state with the server (disconnected, connecting, or connected."""
_hostname: Optional[str]
"""Hostname of the server, set during :meth:`connect` and cleared on disconnect"""
_port: Optional[int]
"""TCP port of the server, set during :meth:`connect` and cleared on disconnect"""
_logger: logging.Logger
"""Logger instance for this client"""
_encoding: str
"""String encoding used for the JSON API wire format (always ``utf8``)"""
_sock: Optional[socket.socket]
"""Active TCP socket, or ``None`` when disconnected"""
_selector: Optional[selectors.DefaultSelector]
"""I/O selector used to poll the socket for incoming data"""
_stream_parser: StreamParser
"""Stateful parser that reconstructs complete JSON datagrams from the raw TCP byte stream"""
_stream_maker: StreamMaker
"""Encoder that frames outgoing JSON messages as length-prefixed datagrams"""
_rx_message_callbacks: List[RxMessageCallback]
"""Callbacks invoked for every received server message, primarily used for testing"""
_reqid: int
"""Monotonically increasing counter used to generate unique request IDs"""
_timeout: float
"""Default timeout in seconds for server requests"""
_write_timeout: float
"""Default timeout in seconds for write-to-device requests"""
_request_status_timer: Timer
"""Timer that fires periodically to trigger automatic server-status poll requests"""
_require_status_update: bool
"""``True`` when a server-status request must be sent on the next worker-thread iteration"""
_write_request_queue: "ScrutinyQueue[Union[WriteRequest, FlushPoint, BatchWriteContext]]"
"""Thread-safe queue through which the user thread submits write requests to the worker thread"""
_pending_api_batch_writes: Dict[str, PendingAPIBatchWrite]
"""Active batch write operations indexed by their request token, awaiting per-item completions"""
_memory_read_completion_dict: Dict[str, api_parser.MemoryReadCompletion]
"""Completed memory-read responses cached until the user thread retrieves them, indexed by request token"""
_memory_write_completion_dict: Dict[str, api_parser.MemoryWriteCompletion]
"""Completed memory-write responses cached until the user thread retrieves them, indexed by request token"""
_pending_datalogging_requests: Dict[str, sdk.datalogging.DataloggingRequest]
"""Active datalogging requests indexed by their request token"""
_pending_watchable_download_request: Dict[int, WatchableListDownloadRequest]
"""Active watchable-list download request handles indexed by request ID"""
_pending_sfd_download_requests: Dict[int, SFDDownloadRequest]
"""Active SFD download request handles indexed by request ID"""
_pending_sfd_upload_requests: Dict[int, SFDUploadRequest]
"""Active SFD upload request handles indexed by request ID. Multiple request IDs may map to the same handle"""
_worker_thread: Optional[threading.Thread]
"""Background thread responsible for all socket I/O and message dispatch"""
_threading_events: _ThreadingEvents
"""Group of ``threading.Event`` objects used to coordinate the user thread and the worker thread"""
_sock_lock: threading.Lock
"""Lock protecting access to ``_sock`` during send and disconnect operations"""
_main_lock: threading.Lock
"""Lock protecting the main internal-state variables shared between threads"""
_user_lock: threading.Lock
"""Lock protecting resources that the SDK user may read from the main thread"""
_callback_storage: Dict[int, CallbackStorageEntry]
"""Pending server requests indexed by request ID, holding callbacks and futures"""
_watchable_storage: Dict[str, WatchableHandle]
"""Cache of ``WatchableHandle`` objects given to the user, indexed by server-assigned ID"""
_watchable_path_to_id_map: Dict[str, str]
"""Mapping from a watchable's display path to its server-assigned ID"""
_server_info: Optional[ServerInfo]
"""Most-recently received server status, or ``None`` if no status has arrived yet"""
_last_server_info: Optional[ServerInfo]
"""Server status from the previous update cycle, used to detect state transitions"""
_active_batch_context: Optional[BatchWriteContext]
"""The currently open batch write context, or ``None`` when not inside a :meth:`batch_write` block"""
_listeners: List[listeners.BaseListener]
"""Registered listener objects that receive watchable-value update notifications"""
_event_queue: "ScrutinyQueue[Events._ANY_EVENTS]"
"""Thread-safe queue holding pending events for the user to consume via :meth:`read_event."""
_enabled_events: int
"""Bitmask of event types currently enabled, built from ``Events.LISTEN_*`` constants"""
_datarate_measurements: DataRateMeasurements
"""Tracks bidirectional data and message rates with the server"""
_server_timebase: RelativeTimebase
"""Converts server-relative microsecond timestamps to absolute ``datetime`` values"""
_force_fail_request: bool
"""When ``True``, all outgoing requests are immediately failed. Used in unit tests"""
def __enter__(self) -> "ScrutinyClient":
"""Enter the context manager and return this client instance"""
return self
def __exit__(self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[types.TracebackType]) -> Literal[False]:
"""Exit the context manager and disconnect from the server.
:returns: ``False``, so any exception raised inside the block is propagated.
"""
self.disconnect()
return False
[docs]
def __init__(self,
name: Optional[str] = None,
rx_message_callbacks: Optional[List[RxMessageCallback]] = None,
timeout: float = 4.0,
write_timeout: float = 5.0,
enabled_events: int = Events.LISTEN_NONE
):
"""
Creates a client that can communicate with a Scrutiny server
:param name: Name of the client. Used for logging
:param rx_message_callbacks: A callback to call each time a server message is received. Called from a separate thread. Mainly used for debugging and testing
:param timeout: Default timeout to use when making a request to the server
:param write_timeout: Default timeout to use when writing to the device memory
:param enabled_events: A flag value constructed by ORing values from :class:`ScrutinyClient.Events<scrutiny.sdk.client.ScrutinyClient.Events>`. Can
be changed later by invoking :meth:`listen_events<listen_events>`. See :ref:`Using events<page_using_events>` for more details
"""
logger_name = self.__class__.__name__
if name is not None:
logger_name += f"[{name}]"
self._logger = logging.getLogger(logger_name)
self._name = name
self._server_state = ServerState.Disconnected
self._hostname = None
self._port = None
self._encoding = 'utf8'
self._sock = None
self._selector = None
self._rx_message_callbacks = [] if rx_message_callbacks is None else rx_message_callbacks
self._worker_thread = None
self._threading_events = self._ThreadingEvents()
self._sock_lock = threading.Lock()
self._main_lock = threading.Lock()
self._user_lock = threading.Lock()
self._reqid = 0
self._timeout = timeout
self._write_timeout = write_timeout
self._request_status_timer = Timer(self._UPDATE_SERVER_STATUS_INTERVAL)
self._require_status_update = False
self._server_info = None
self._last_server_info = None
self._write_request_queue = ScrutinyQueue()
self._pending_api_batch_writes = {}
self._memory_read_completion_dict = {}
self._memory_write_completion_dict = {}
self._pending_datalogging_requests = {}
self._pending_watchable_download_request = {}
self._pending_sfd_download_requests = {}
self._pending_sfd_upload_requests = {}
self._watchable_storage = {}
self._watchable_path_to_id_map = {}
self._callback_storage = {}
self._connection_cancel_request = False
self._active_batch_context = None
self._listeners = []
self._locked_for_connect = False
self._stream_parser = TCPClientHandler.get_compatible_stream_parser()
self._stream_maker = TCPClientHandler.get_compatible_stream_maker()
self._datarate_measurements = DataRateMeasurements()
self._event_queue = ScrutinyQueue(maxsize=100) # Not supposed to go much above 1 or 2
self.listen_events(enabled_events)
self._force_fail_request = False
self._server_timebase = RelativeTimebase()
def _trigger_event(self, evt: Events._ANY_EVENTS, loglevel: int = logging.NOTSET) -> None:
"""Push an event onto the event queue if its type is currently enabled.
:param evt: The event object to enqueue.
:param loglevel: Optional logging level at which to log the event message.
"""
if self._enabled_events & evt._filter_flag:
try:
if self._logger.isEnabledFor(loglevel):
self._logger.log(loglevel, evt.msg())
self._event_queue.put_nowait(evt)
except queue.Full:
self._logger.error("Event queue is full. Dropping event")
def _start_worker_thread(self) -> None:
"""Spawn the background worker thread and block until it has started"""
self._threading_events.stop_worker_thread.clear()
self._threading_events.disconnect.clear()
started_event = threading.Event()
self._worker_thread = threading.Thread(target=self._worker_thread_task, args=[started_event], daemon=True)
self._worker_thread.start()
started_event.wait()
self._logger.debug('Worker thread started')
def _stop_worker_thread(self) -> None:
"""Signal the worker thread to stop and block until it has exited"""
if self._worker_thread is not None:
self._logger.debug("Stopping worker thread")
if self._worker_thread.is_alive():
self._threading_events.stop_worker_thread.set()
self._worker_thread.join()
self._logger.debug("Worker thread stopped")
else:
self._logger.debug("Worker thread already stopped")
self._worker_thread = None
def _worker_thread_task(self, started_event: threading.Event) -> None:
"""Main loop of the background worker thread.
Processes incoming messages, drives periodic server-status updates, dispatches
write requests, and handles disconnect requests and connection errors.
:param started_event: Set immediately after initialization so the spawning thread can unblock.
"""
self._require_status_update = True # Bootstrap status update loop
self._datarate_measurements.enable()
started_event.set()
self._request_status_timer.start()
# _sock will be None after a disconnect
last_deferred_response_timeout_check = time.monotonic()
while not self._threading_events.stop_worker_thread.is_set() and self._sock is not None:
require_sync_before = False
try:
if self._threading_events.require_sync.is_set():
require_sync_before = True
self._wt_process_next_server_status_update()
for msg in self._wt_recv(timeout=0.005):
self._wt_process_rx_api_message(cast(api_typing.S2CMessage, msg))
self._wt_check_callbacks_timeouts()
if time.monotonic() - last_deferred_response_timeout_check > 1.0: # Avoid locking the main lock too often
self._check_deferred_response_timeouts()
last_deferred_response_timeout_check = time.monotonic()
self._wt_process_write_watchable_requests()
self._wt_process_device_state()
self._datarate_measurements.update()
except sdk.exceptions.ConnectionError as e:
if self._connection_cancel_request:
self._logger.debug(f"Connection error in worker thread (caused by explicit cancel): {e}")
else:
self._logger.error(f"Connection error in worker thread: {e}")
self._wt_disconnect() # Will set _sock to None
except Exception as e:
tools.log_exception(self._logger, e, "Unhandled exception in worker thread")
self._wt_disconnect() # Will set _sock to None
if self._threading_events.disconnect.is_set():
self._logger.debug(f"User required to disconnect")
self._wt_disconnect() # Will set _sock to None
self._threading_events.disconnected.set()
if require_sync_before:
self._threading_events.require_sync.clear()
self._threading_events.sync_complete.set()
self._datarate_measurements.disable()
self._logger.debug('Worker thread is exiting')
self._threading_events.stop_worker_thread.clear()
def _wt_process_msg_inform_server_status(self, msg: api_typing.S2C.InformServerStatus, reqid: Optional[int]) -> None:
"""Handle an ``INFORM_SERVER_STATUS`` message by updating the cached server info and firing events.
:param msg: The parsed server-to-client message.
:param reqid: The request ID echoed by the server, or ``None`` if absent.
"""
self._request_status_timer.start()
info = api_parser.parse_inform_server_status(msg)
self._logger.debug('Updating server status')
with self._main_lock:
self._server_info = info
self._threading_events.server_status_updated.set()
self._trigger_event(self.Events.StatusUpdateEvent(info=info))
def _wt_process_msg_watchable_update(self, msg: api_typing.S2C.WatchableUpdate, reqid: Optional[int]) -> None:
"""Handle a ``WATCHABLE_UPDATE`` message by refreshing the value of all affected handles.
:param msg: The parsed server-to-client message.
:param reqid: The request ID echoed by the server, or ``None`` if absent.
"""
updates = api_parser.parse_watchable_update(msg)
updated_watchables: List[WatchableHandle] = []
for update in updates:
with self._main_lock:
watchable: Optional[WatchableHandle] = None
if update.server_id in self._watchable_storage:
watchable = self._watchable_storage[update.server_id]
if watchable is None:
self._logger.error(f"Got watchable update for unknown watchable. Server ID={update.server_id}")
continue
else:
if self._logger.isEnabledFor(DUMPDATA_LOGLEVEL): # prgama: no cover
self._logger.log(DUMPDATA_LOGLEVEL, f"Updating value of {update.server_id} ({watchable.name})")
update_dt = self._server_timebase.micro_to_dt(update.server_time_us)
if update.value is None:
watchable._set_invalid(update.value_status, timestamp=update_dt)
else:
watchable._update_value(update.value, timestamp=update_dt)
updated_watchables.append(watchable)
for listener in self._listeners:
listener._broadcast_update(updated_watchables)
def _wt_process_msg_inform_write_completion(self, msg: api_typing.S2C.WriteCompletion, reqid: Optional[int]) -> None:
"""Handle a ``INFORM_WRITE_COMPLETION`` message and mark the corresponding write request as done.
:param msg: The parsed server-to-client message.
:param reqid: The request ID echoed by the server, or ``None`` if absent.
"""
completion = api_parser.parse_write_completion(msg)
if completion.request_token not in self._pending_api_batch_writes:
return # Maybe triggered by another client. Silently ignore.
batch_write = self._pending_api_batch_writes[completion.request_token]
if completion.batch_index not in batch_write.update_dict:
self._logger.error("The server returned a write completion with an unknown batch_index")
return
write_request = batch_write.update_dict[completion.batch_index]
if completion.success:
write_request._watchable._set_last_write_datetime()
write_request._mark_complete(True, server_time_us=completion.server_time_us)
else:
write_request._mark_complete(False, "Server failed to write to the device", server_time_us=completion.server_time_us)
del batch_write.update_dict[completion.batch_index]
def _wt_process_msg_inform_memory_read_complete(self, msg: api_typing.S2C.ReadMemoryComplete, reqid: Optional[int]) -> None:
"""Handle a ``INFORM_MEMORY_READ_COMPLETE`` message and cache the result for the user thread.
:param msg: The parsed server-to-client message.
:param reqid: The request ID echoed by the server, or ``None`` if absent.
"""
completion = api_parser.parse_memory_read_completion(msg)
with self._main_lock:
if completion.request_token not in self._memory_read_completion_dict:
self._memory_read_completion_dict[completion.request_token] = completion
else:
self._logger.error(f"Received duplicate memory read completion with request token {completion.request_token}")
def _wt_process_msg_inform_memory_write_complete(self, msg: api_typing.S2C.WriteMemoryComplete, reqid: Optional[int]) -> None:
"""Handle a ``INFORM_MEMORY_WRITE_COMPLETE`` message and cache the result for the user thread.
:param msg: The parsed server-to-client message.
:param reqid: The request ID echoed by the server, or ``None`` if absent.
"""
completion = api_parser.parse_memory_write_completion(msg)
with self._main_lock:
if completion.request_token not in self._memory_write_completion_dict:
self._memory_write_completion_dict[completion.request_token] = completion
else:
self._logger.error(f"Received duplicate memory write completion with request token {completion.request_token}")
def _wt_process_msg_datalogging_acquisition_complete(self, msg: api_typing.S2C.InformDataloggingAcquisitionComplete, reqid: Optional[int]) -> None:
"""Handle a datalogging acquisition-complete notification and mark the request as done.
:param msg: The parsed server-to-client message.
:param reqid: The request ID echoed by the server, or ``None`` if absent.
"""
completion = api_parser.parse_datalogging_acquisition_complete(msg)
if completion.request_token not in self._pending_datalogging_requests:
self._logger.warning('Received a notice of completion for a datalogging acquisition, but its request_token was unknown')
return
request = self._pending_datalogging_requests[completion.request_token]
request._mark_complete_specialized(completion.success, completion.reference_id, completion.detail_msg)
del self._pending_datalogging_requests[completion.request_token]
def _wt_process_msg_datalogging_list_changed(self, msg: api_typing.S2C.InformDataloggingListChanged, reqid: Optional[int]) -> None:
"""Handle a datalogging-list-changed notification and fire the corresponding event.
:param msg: The parsed server-to-client message.
:param reqid: The request ID echoed by the server, or ``None`` if absent.
"""
parsed = api_parser.parse_datalogging_list_changed(msg)
self._trigger_event(
self.Events.DataloggingListChanged(acquisition_reference_id=parsed.reference_id, change_type=parsed.action),
loglevel=logging.DEBUG
)
def _wt_process_msg_get_watchable_list_response(self, msg: api_typing.S2C.GetWatchableList, reqid: Optional[int]) -> None:
"""Handle a watchable-list response chunk and forward it to the matching download request.
:param msg: The parsed server-to-client message.
:param reqid: The request ID identifying the download request, or ``None`` if absent.
"""
if reqid is None:
self._logger.warning('Received a watchable list message, but the request ID was not available.')
return
content = api_parser.parse_get_watchable_list(msg)
with self._main_lock:
if reqid not in self._pending_watchable_download_request:
self._logger.warning(f'Received a watchable list message, but the request ID was is not tied to any active request {reqid}')
return
req = self._pending_watchable_download_request[reqid]
req._add_data(content.data, content.done)
if content.done:
req._mark_complete(success=True)
def _wt_process_msg_welcome(self, msg: api_typing.S2C.Welcome, reqid: Optional[int]) -> None:
"""Handle the server's welcome message by initializing the server timebase and signaling readiness.
:param msg: The parsed server-to-client welcome message.
:param reqid: The request ID echoed by the server, or ``None`` if absent.
"""
welcome_data = api_parser.parse_welcome(msg)
self._server_timebase.set_zero_to(welcome_data.server_time_zero_timestamp)
self._threading_events.welcome_received.set()
def _wt_process_msg_download_sfd_response(self, msg: api_typing.S2C.DownloadSFD, reqid: Optional[int]) -> None:
"""Handle an SFD download chunk response and forward it to the matching download request.
:param msg: The parsed server-to-client message.
:param reqid: The request ID identifying the download request, or ``None`` if absent.
"""
if reqid is None:
self._logger.warning('Received a SFD chunk, but the request ID was not available.')
return
content = api_parser.parse_download_sfd_response(msg)
try:
req = self._pending_sfd_download_requests[reqid] # atomic
except KeyError:
self._logger.warning(f'Received a SFD chunk, but the request ID was is not tied to any active request {reqid}')
return
req._set_expected_total_size(content.total_size)
req._add_data(content.data)
if req.received_count == content.total_size:
req._mark_complete(success=True)
elif req.received_count > content.total_size:
req._mark_complete(success=False, failure_reason="Received too much data, the server is unreliable")
def _wt_process_msg_upload_sfd_data_response(self, msg: api_typing.S2C.UploadSFDData, reqid: Optional[int]) -> None:
"""Handle an SFD upload data-chunk response and mark the upload request as complete when the server signals it is done.
:param msg: The parsed server-to-client message.
:param reqid: The request ID identifying the upload request, or ``None`` if absent.
"""
if reqid is None:
self._logger.warning('Received a SFD chunk, but the request ID was not available.')
return
content = api_parser.parse_upload_sfd_data_response(msg)
try:
req = self._pending_sfd_upload_requests[reqid] # atomic
except KeyError:
self._logger.warning(f'Received an SFD upload chunk response, but the request ID was is not tied to any active request {reqid}')
return
req._set_actual_size(content.actual_size)
if content.completed:
assert content.sfd_info is not None # Not None if completed. Guaranteed by parser
req._set_sfd_info(content.sfd_info)
req._mark_complete(success=True)
def _wt_process_next_server_status_update(self) -> None:
"""Periodically request a server-status update when the polling timer has elapsed or an immediate update was requested"""
if self._request_status_timer.is_timed_out() or self._require_status_update:
self._require_status_update = False
self._request_status_timer.stop()
self.logger.debug("Requesting server status update")
req = self._make_request(API.Command.Client2Api.GET_SERVER_STATUS)
self._send(req) # No callback, we have a continuous listener
def _wt_check_callbacks_timeouts(self) -> None:
"""Scan all pending callbacks and fire a ``TimedOut`` completion for any that have exceeded their timeout"""
now = time.monotonic()
with self._main_lock:
reqids = list(self._callback_storage.keys())
for reqid in reqids:
with self._main_lock:
callback_entry: Optional[CallbackStorageEntry] = None
if reqid in self._callback_storage:
callback_entry = self._callback_storage[reqid]
if callback_entry is None:
continue
if now - callback_entry._creation_timestamp_monotonic > callback_entry._timeout:
try:
callback_entry._callback(CallbackState.TimedOut, None)
except (sdk.exceptions.ConnectionError):
raise
except Exception:
pass
callback_entry._future._wt_mark_completed(CallbackState.TimedOut)
with self._main_lock:
if reqid in self._callback_storage:
del self._callback_storage[reqid]
def _check_deferred_response_timeouts(self) -> None:
"""Expire and remove stale pending-request objects (memory reads/writes, watchable downloads, SFD transfers) that have received no data for too long"""
# Release the handle to pending request objects after a certain amount of time.
with self._main_lock:
for kstr in list(self._memory_read_completion_dict.keys()):
if time.monotonic() - self._memory_read_completion_dict[kstr].local_monotonic_timestamp > self._MEMORY_READ_DATA_LIFETIME:
del self._memory_read_completion_dict[kstr]
for kstr in list(self._memory_write_completion_dict.keys()):
if time.monotonic() - self._memory_write_completion_dict[kstr].local_monotonic_timestamp > self._MEMORY_WRITE_DATA_LIFETIME:
del self._memory_write_completion_dict[kstr]
for kint in list(self._pending_watchable_download_request.keys()):
wd_req = self._pending_watchable_download_request[kint]
if wd_req._is_expired(self._DOWNLOAD_WATCHABLE_LIST_LIFETIME):
if not wd_req.completed:
wd_req._mark_complete(False, "No server data for too long")
del self._pending_watchable_download_request[kint]
for kint in list(self._pending_sfd_download_requests.keys()):
sfdd_req = self._pending_sfd_download_requests[kint]
if sfdd_req._is_expired(self._DOWNLOAD_SFD_REQUEST_LIFETIME):
if not sfdd_req.completed:
sfdd_req._mark_complete(False, "No server data for too long")
del self._pending_sfd_download_requests[kint]
for kint in list(self._pending_sfd_upload_requests.keys()):
sfdu_req = self._pending_sfd_upload_requests[kint]
if sfdu_req._is_expired(self._UPLOAD_SFD_REQUEST_LIFETIME):
if not sfdu_req.completed:
sfdu_req._mark_complete(False, "No server response for too long")
del self._pending_sfd_upload_requests[kint]
def _add_rx_message_callback(self, callback: RxMessageCallback) -> None:
"""Internal method to add middleware on response reception. Mostly for testing"""
self._rx_message_callbacks.append(callback)
def _wt_process_rx_api_message(self, msg: api_typing.S2CMessage) -> None:
"""Dispatch a raw server-to-client message to the appropriate handler based on its ``cmd`` field.
:param msg: The raw server-to-client JSON message dictionary.
"""
self._threading_events.msg_received.set()
# These callbacks are mainly for testing.
for callback in self._rx_message_callbacks:
callback(self, msg)
reqid: Optional[int] = msg.get('reqid', None)
cmd: Optional[str] = msg.get('cmd', None)
if cmd is None:
self._logger.error('Got a message without a "cmd" field')
self._logger.debug(msg)
else:
try:
if cmd == API.Command.Api2Client.WATCHABLE_UPDATE:
self._wt_process_msg_watchable_update(cast(api_typing.S2C.WatchableUpdate, msg), reqid)
elif cmd == API.Command.Api2Client.INFORM_SERVER_STATUS:
self._wt_process_msg_inform_server_status(cast(api_typing.S2C.InformServerStatus, msg), reqid)
elif cmd == API.Command.Api2Client.INFORM_WRITE_COMPLETION:
self._wt_process_msg_inform_write_completion(cast(api_typing.S2C.WriteCompletion, msg), reqid)
elif cmd == API.Command.Api2Client.INFORM_MEMORY_READ_COMPLETE:
self._wt_process_msg_inform_memory_read_complete(cast(api_typing.S2C.ReadMemoryComplete, msg), reqid)
elif cmd == API.Command.Api2Client.INFORM_MEMORY_WRITE_COMPLETE:
self._wt_process_msg_inform_memory_write_complete(cast(api_typing.S2C.WriteMemoryComplete, msg), reqid)
elif cmd == API.Command.Api2Client.INFORM_DATALOGGING_ACQUISITION_COMPLETE:
self._wt_process_msg_datalogging_acquisition_complete(cast(api_typing.S2C.InformDataloggingAcquisitionComplete, msg), reqid)
elif cmd == API.Command.Api2Client.INFORM_DATALOGGING_LIST_CHANGED:
self._wt_process_msg_datalogging_list_changed(cast(api_typing.S2C.InformDataloggingListChanged, msg), reqid)
elif cmd == API.Command.Api2Client.GET_WATCHABLE_LIST_RESPONSE:
self._wt_process_msg_get_watchable_list_response(cast(api_typing.S2C.GetWatchableList, msg), reqid)
elif cmd == API.Command.Api2Client.WELCOME:
self._wt_process_msg_welcome(cast(api_typing.S2C.Welcome, msg), reqid)
elif cmd == API.Command.Api2Client.DOWNLOAD_SFD_RESPONSE:
self._wt_process_msg_download_sfd_response(cast(api_typing.S2C.DownloadSFD, msg), reqid)
elif cmd == API.Command.Api2Client.UPLOAD_SFD_DATA_RESPONSE:
self._wt_process_msg_upload_sfd_data_response(cast(api_typing.S2C.UploadSFDData, msg), reqid)
except sdk.exceptions.BadResponseError as e:
tools.log_exception(self._logger, e, "Bad message from server")
if reqid is not None: # message is a response to a request
self._wt_process_callbacks(cmd, msg, reqid)
def _wt_process_callbacks(self, cmd: str, msg: api_typing.S2CMessage, reqid: int) -> None:
"""Invoke the registered callback for a given ``reqid``, updating the associated future with the outcome.
:param cmd: The API command string from the server message.
:param msg: The full server-to-client message.
:param reqid: The request ID whose callback should be invoked.
"""
callback_entry: Optional[CallbackStorageEntry] = None
with self._main_lock:
if reqid in self._callback_storage:
callback_entry = self._callback_storage[reqid]
# We have a callback for that response
if callback_entry is not None:
error: Optional[Exception] = None
if cmd == API.Command.Api2Client.ERROR_RESPONSE:
error = Exception(msg.get('msg', "No error message provided"))
self._logger.error(f"Server returned an error response. reqid={reqid}. {error}")
try:
callback_entry._callback(CallbackState.ServerError, msg)
except (sdk.exceptions.ConnectionError):
raise
except Exception:
pass
finally:
callback_entry._future._wt_mark_completed(CallbackState.ServerError, error=error)
else:
try:
self._logger.debug(f"Running {cmd} callback for request ID {reqid}")
callback_entry._callback(CallbackState.OK, msg)
except (sdk.exceptions.ConnectionError):
raise
except Exception as e:
error = e
if error is not None:
tools.log_exception(self._logger, error, f"Callback raised an exception. cmd={cmd}, reqid={reqid}.")
callback_entry._future._wt_mark_completed(CallbackState.CallbackError, error=error)
elif callback_entry._future.state == CallbackState.Pending:
callback_entry._future._wt_mark_completed(CallbackState.OK)
with self._main_lock:
if reqid in self._callback_storage:
del self._callback_storage[reqid]
if cmd == API.Command.Api2Client.ERROR_RESPONSE:
# Some request have pending requests based on reqid. they have multiple response per reqid. If any fail, cancel the request
# Todo : Generalize the mechanism : 1 req -> multiple response by reqid.
buckets: List[Mapping[int, PendingRequest]] = [
self._pending_watchable_download_request,
self._pending_sfd_download_requests,
self._pending_sfd_upload_requests
]
for bucket in buckets:
try:
req = bucket[reqid]
except KeyError:
continue
req._mark_complete(False, failure_reason=str(msg.get('msg', "No error message provided")))
def _wt_process_write_watchable_requests(self) -> None:
"""Dequeue pending watchable write requests, build a batched API write message, and send it to the server"""
# Note _pending_api_batch_writes is always accessed from worker thread
api_req = self._make_request(API.Command.Client2Api.WRITE_WATCHABLE, {'updates': []})
api_req = cast(api_typing.C2S.WriteValue, api_req)
# Clear old requests.
# No need for lock here. The _request_queue crosses time domain boundaries
now = time.perf_counter()
if len(self._pending_api_batch_writes) > 0:
tokens = list(self._pending_api_batch_writes.keys())
for token in tokens:
pending_batch = self._pending_api_batch_writes[token]
if now - pending_batch.creation_perf_timestamp > pending_batch.timeout:
for request in pending_batch.update_dict.values(): # Completed request are already removed of that dict.
request._mark_complete(False, f"Timed out ({pending_batch.timeout} seconds)")
del self._pending_api_batch_writes[token]
else:
for request in pending_batch.update_dict.values(): # Completed request are already removed of that dict.
if request.watchable.is_dead:
request._mark_complete(False, f"{request.watchable.name} is not available anymore")
# Once a batch is fully processed, meaning all requests have been treated and removed
# We can prune the remaining empty batch
if len(pending_batch.update_dict) == 0:
del self._pending_api_batch_writes[token]
# Process new requests
n = 0
batch_dict: Dict[int, WriteRequest] = {}
while True:
obj = self._write_request_queue.get_or_none()
if obj is None:
break
if isinstance(obj, FlushPoint):
break
requests: List[WriteRequest] = []
batch_timeout = self._write_timeout
if isinstance(obj, BatchWriteContext):
if n != 0:
raise RuntimeError("Missing FlushPoint before Batch")
if len(obj.requests) > self._MAX_WRITE_REQUEST_BATCH_SIZE:
for request in obj.requests:
request._mark_complete(False, "Batch too big")
break
requests = obj.requests
batch_timeout = obj.timeout
elif isinstance(obj, WriteRequest):
requests = [obj]
else:
raise RuntimeError("Unsupported element in write queue")
for request in requests:
if n < self._MAX_WRITE_REQUEST_BATCH_SIZE:
if request._watchable._configuration is not None:
api_req['updates'].append({
'batch_index': n,
'watchable': request._watchable.server_id,
'value': request._value
})
batch_dict[n] = request
n += 1
else:
request._mark_complete(False, "Watchable has been made invalid")
else:
request._mark_complete(False, "Batch overflowed") # Should never happen because we enforce n==0 on batch
if n >= self._MAX_WRITE_REQUEST_BATCH_SIZE:
break
if len(api_req['updates']) == 0:
return
def _wt_write_watchable_response_callback(state: CallbackState, response: Optional[api_typing.S2CMessage]) -> None:
if response is not None and state == CallbackState.OK:
confirmation = api_parser.parse_write_value_response(cast(api_typing.S2C.WriteValue, response))
if confirmation.count != len(batch_dict):
for request in batch_dict.values():
request._mark_complete(False, f"Count mismatch in request and server confirmation.")
else:
self._pending_api_batch_writes[confirmation.request_token] = PendingAPIBatchWrite(
update_dict=batch_dict,
confirmation=confirmation,
creation_perf_timestamp=time.perf_counter(), # Used to prune the dict if no response after X time
timeout=batch_timeout
)
else:
# The WriteRequest is different because we give a future object to the client and it's
# to wait for the initial response. We do our own error message handling because we do not have access to the future object generated by send()
error = f"[{state.name}] No error message provided"
if state == CallbackState.ServerError and response is not None:
msg = cast(api_typing.S2C.Error, response).get('msg', None)
if msg is not None:
error = msg
for request in batch_dict.values():
request._mark_complete(False, error)
self._send(api_req, _wt_write_watchable_response_callback, timeout=batch_timeout)
# We don't need the future object here because the WriteRequest act as one.
def _wt_process_device_state(self) -> None:
"""Check the state of the device and take action when it changes"""
if self._server_info is not None:
# ==== Check Device conn
if self._last_server_info is not None and self._last_server_info.device_session_id is not None:
if self._last_server_info.device_session_id != self._server_info.device_session_id: # New value or None
self._wt_clear_all_watchables(ValueStatus.DeviceGone)
self._trigger_event(self.Events.DeviceGoneEvent(session_id=self._last_server_info.device_session_id), loglevel=logging.INFO)
if self._server_info.device_session_id is not None:
self._trigger_event(self.Events.DeviceReadyEvent(session_id=self._server_info.device_session_id), loglevel=logging.INFO)
else:
if self._server_info.device_session_id is not None:
self._trigger_event(self.Events.DeviceReadyEvent(session_id=self._server_info.device_session_id), loglevel=logging.INFO)
# ==== Check SFD
if self._last_server_info is not None and self._last_server_info.sfd_firmware_id is not None:
if self._server_info.sfd_firmware_id != self._last_server_info.sfd_firmware_id: # None or new value
self._wt_clear_all_watchables(ValueStatus.SFDUnloaded, [WatchableType.Alias, WatchableType.Variable]) # RPVs are still there.
self._trigger_event(self.Events.SFDUnLoadedEvent(firmware_id=self._last_server_info.sfd_firmware_id), loglevel=logging.INFO)
if self._server_info.sfd_firmware_id is not None:
self._trigger_event(self.Events.SFDLoadedEvent(firmware_id=self._server_info.sfd_firmware_id), loglevel=logging.INFO)
else:
if self._server_info.sfd_firmware_id is not None:
self._trigger_event(self.Events.SFDLoadedEvent(firmware_id=self._server_info.sfd_firmware_id), loglevel=logging.INFO)
if self._last_server_info is not None:
if self._last_server_info.datalogging.state != self._server_info.datalogging.state:
# Passage from/to NA are logged as debug only to keep the info log clean
loglevel = logging.DEBUG if DataloggingState.NA in (
self._last_server_info.datalogging.state, self._server_info.datalogging.state) else logging.INFO
self._trigger_event(self.Events.DataloggingStateChanged(self._server_info.datalogging), loglevel=loglevel)
elif self._last_server_info.datalogging.completion_ratio != self._server_info.datalogging.completion_ratio:
self._trigger_event(self.Events.DataloggingStateChanged(self._server_info.datalogging), loglevel=logging.DEBUG)
else:
if self._last_server_info is not None:
if self._last_server_info.device_session_id is not None:
self._trigger_event(self.Events.DeviceGoneEvent(session_id=self._last_server_info.device_session_id), loglevel=logging.INFO)
if self._last_server_info.sfd_firmware_id is not None:
self._trigger_event(self.Events.SFDUnLoadedEvent(firmware_id=self._last_server_info.sfd_firmware_id), loglevel=logging.INFO)
self._last_server_info = self._server_info
def close_socket(self) -> None:
"""Forcefully attempt to close a socket to cancel any pending connection or requests"""
# Does not rexpect _sock_lock on purpose.
with tools.SuppressException():
if self._sock is not None:
self._connection_cancel_request = True # Simply to mask the connection error we will cause
self._sock.close() # Try it. May fail, it's ok.
def _wt_disconnect(self) -> None:
"""Disconnect from a Scrutiny server, called by the Worker Thread .
Does not throw an exception in case of broken pipe
"""
self.close_socket()
with self._sock_lock:
if self._sock is not None:
self._logger.debug(f"Disconnecting from server at {self._hostname}:{self._port}")
try:
self._sock.close()
except socket.error as e:
tools.log_exception(self._logger, e, "Failed to close the socket", str_level=logging.DEBUG)
if self._selector is not None:
self._selector.close()
self._stream_parser.reset()
self._sock = None
self._selector = None
events_to_trigger: List[ScrutinyClient.Events._ANY_EVENTS] = []
with self._main_lock:
if self._last_server_info is not None:
if self._last_server_info.device_session_id is not None:
events_to_trigger.append(self.Events.DeviceGoneEvent(session_id=self._last_server_info.device_session_id))
if self._last_server_info.sfd_firmware_id is not None:
events_to_trigger.append(self.Events.SFDUnLoadedEvent(firmware_id=self._last_server_info.sfd_firmware_id))
if self._server_state == ServerState.Connected and self._hostname is not None and self._port is not None:
events_to_trigger.append(self.Events.DisconnectedEvent(self._hostname, self._port))
self._last_server_info = None
with self._user_lock: # Critical part, the user reads those properties
self._wt_clear_all_watchables(ValueStatus.ServerGone)
self._wt_clear_all_pending_requests("Server is disconnected")
self._hostname = None
self._port = None
self._server_state = ServerState.Disconnected
self._server_info = None
for callback_entry in self._callback_storage.values():
if callback_entry._future.state == CallbackState.Pending:
callback_entry._future._wt_mark_completed(CallbackState.Cancelled)
self._callback_storage.clear()
for event in events_to_trigger:
self._trigger_event(event, loglevel=logging.INFO)
def _wt_clear_all_watchables(self, new_status: ValueStatus, watchable_types: Optional[List[WatchableType]] = None) -> None:
"""Invalidate and remove all tracked watchables of the given types, setting their status to ``new_status``.
The caller must already hold ``_main_lock``.
:param new_status: The ``ValueStatus`` to assign to each removed watchable. Must not be ``ValueStatus.Valid``.
:param watchable_types: List of watchable types to clear. If ``None``, all types are cleared.
"""
# Don't lock the main lock, supposed to be done beforehand
assert new_status is not ValueStatus.Valid
if watchable_types is None:
watchable_types = [WatchableType.Alias, WatchableType.Variable, WatchableType.RuntimePublishedValue]
server_ids = list(self._watchable_storage.keys())
for server_id in server_ids:
watchable = self._watchable_storage[server_id]
if watchable.type in watchable_types:
watchable._set_dead(new_status)
if watchable.server_path in self._watchable_path_to_id_map:
del self._watchable_path_to_id_map[watchable.server_path]
del self._watchable_storage[server_id]
def _wt_clear_all_pending_requests(self, failure_reason: str) -> None:
"""Cancels and clear all request handles that may take a long time to respond.
These handles are passed to the users"""
# Don't lock the main lock, supposed to be done beforehand
self._memory_read_completion_dict.clear()
self._memory_write_completion_dict.clear()
for request in self._pending_watchable_download_request.values():
request._mark_complete(success=False, failure_reason=failure_reason)
self._pending_watchable_download_request.clear()
for datalog_request in self._pending_datalogging_requests.values():
datalog_request._mark_complete(success=False, failure_reason=failure_reason)
self._pending_datalogging_requests.clear()
for batch_write_request in self._pending_api_batch_writes.values():
for write_req in batch_write_request.update_dict.values():
write_req._mark_complete(success=False, failure_reason=failure_reason)
self._pending_api_batch_writes.clear()
for sfd_download_request in self._pending_sfd_download_requests.values():
sfd_download_request._mark_complete(False, failure_reason=failure_reason)
self._pending_sfd_download_requests.clear()
for sfd_upload_request in self._pending_sfd_upload_requests.values():
sfd_upload_request._mark_complete(False, failure_reason=failure_reason)
self._pending_sfd_upload_requests.clear()
def _register_callback(self, reqid: int, callback: ApiResponseCallback, timeout: float) -> ApiResponseFuture:
"""Register a callback for a given request ID and return a future that will be resolved by the worker thread.
:param reqid: The request ID to watch for.
:param callback: Callable invoked when the server responds or the request times out.
:param timeout: How many seconds to wait before declaring a ``TimedOut`` state.
:returns: An :class:`ApiResponseFuture` that tracks the request outcome.
"""
future = ApiResponseFuture(reqid, default_wait_timeout=timeout + 0.5) # Allow some margin for thread to mark it timed out
callback_entry = CallbackStorageEntry(
reqid=reqid,
callback=callback,
future=future,
timeout=timeout
)
with self._main_lock:
self._callback_storage[reqid] = callback_entry
return future
def _send(self,
obj: api_typing.C2SMessage,
callback: Optional[ApiResponseCallback] = None,
timeout: Optional[float] = None
) -> Optional[ApiResponseFuture]:
"""Sends a message to the API. Return a future if a callback is specified. If no timeout is given, uses the default timeout value
:raises TypeError: If ``obj`` is not a ``dict``.
:raises RuntimeError: If a ``callback`` is provided but ``obj`` has no ``reqid`` field.
:raises ConnectionError: If the socket is not open or a send error occurs.
"""
error: Optional[Exception] = None
future: Optional[ApiResponseFuture] = None
if timeout is None:
timeout = self._timeout
if not isinstance(obj, dict):
raise TypeError(f'ScrutinyClient only sends data under the form of a dictionary. Received {obj.__class__.__name__}')
if callback is not None:
if 'reqid' not in obj:
raise RuntimeError("Missing reqid in request")
future = self._register_callback(obj['reqid'], callback, timeout=timeout)
if self._force_fail_request:
future._wt_mark_completed(CallbackState.SimulatedError, None)
if not self._force_fail_request:
with self._sock_lock:
if self._sock is None or self._stream_maker is None:
raise sdk.exceptions.ConnectionError(f"Disconnected from server")
try:
s = json.dumps(obj)
if self._logger.isEnabledFor(DUMPDATA_LOGLEVEL): # pragma: no cover
self._logger.log(DUMPDATA_LOGLEVEL, f"Sending {s}")
data = self._stream_maker.encode(s.encode(self._encoding))
self._sock.send(data)
self._datarate_measurements.tx_data_rate.add_data(len(data))
self._datarate_measurements.tx_message_rate.add_data(1)
except socket.error as e:
error = e
self._logger.debug(traceback.format_exc())
if error:
self.disconnect()
raise sdk.exceptions.ConnectionError(f"Disconnected from server. {error}")
return future
def _wt_recv(self, timeout: Optional[float] = None) -> Generator[Dict[str, Any], None, None]:
"""Receive available data from the TCP socket, parse it, and yield each complete JSON message.
:param timeout: Maximum seconds to block waiting for socket data. Blocks indefinitely if ``None``.
:raises ConnectionError: If the socket is closed or the server disconnects.
"""
# No need to lock sock_lock here. Important is during disconnection
error: Optional[Exception] = None
obj: Optional[Dict[str, Any]] = None
if self._sock is None or self._selector is None:
raise sdk.exceptions.ConnectionError(f"Disconnected from server")
server_gone = False
try:
events = self._selector.select(timeout)
for key, _ in events:
assert key.fileobj is self._sock
data = self._sock.recv(4096)
if not data:
server_gone = True
else:
self._datarate_measurements.rx_data_rate.add_data(len(data))
self._stream_parser.parse(data)
except socket.error as e:
server_gone = True
error = e
self._logger.debug(traceback.format_exc())
if server_gone:
self._wt_disconnect()
err_str = str(error) if error else ""
raise sdk.exceptions.ConnectionError(f"Disconnected from server. {err_str}")
while not self._stream_parser.queue().empty():
try:
data_bytes = self._stream_parser.queue().get_or_none()
if data_bytes is None:
break
data_str = data_bytes.decode(self._encoding)
if self._logger.isEnabledFor(DUMPDATA_LOGLEVEL): # pragma: no cover
self._logger.log(DUMPDATA_LOGLEVEL, f"Received: {data_str}")
obj = json.loads(data_str)
if obj is not None:
self._datarate_measurements.rx_message_rate.add_data(1)
yield obj
except json.JSONDecodeError as e:
self._logger.error(f"Received malformed JSON from the server. {e}")
self._logger.debug(traceback.format_exc())
def _make_request(self, command: str, data: Optional[Dict[str, Any]] = None) -> api_typing.C2SMessage:
"""Build a client-to-server API request dictionary, stamping it with a unique, auto-incrementing request ID.
:param command: The API command string (e.g. ``API.Command.Client2Api.GET_SERVER_STATUS``).
:param data: Optional extra fields to merge into the request body.
:returns: A :class:`C2SMessage` dictionary ready to be passed to :meth:`_send`.
"""
with self._main_lock:
reqid = self._reqid
self._reqid += 1
if self._reqid >= 2**32 - 1:
self._reqid = 0
cmd: api_typing.BaseC2SMessage = {
'cmd': command,
'reqid': reqid
}
if data is None:
data = {}
data = data.copy()
data.update(cmd)
return cast(api_typing.C2SMessage, data)
def _enqueue_write_request(self, request: Union[WriteRequest, BatchWriteContext, FlushPoint]) -> None:
"""Push a write request, batch context, or flush-point sentinel onto the internal write queue.
:param request: The item to enqueue.
"""
self._write_request_queue.put(request)
def __del__(self) -> None:
"""Ensure the connection is closed when the object is garbage-collected"""
self.disconnect()
def _is_batch_write_in_progress(self) -> bool:
"""Return ``True`` if a :class:`BatchWriteContext` is currently active"""
return self._active_batch_context is not None
def _process_write_request(self, request: WriteRequest) -> None:
"""Route a single write request to the active batch context or directly to the write queue.
:param request: The :class:`WriteRequest` to process.
"""
if self._is_batch_write_in_progress():
assert self._active_batch_context is not None
self._active_batch_context.requests.append(request)
else:
self._enqueue_write_request(request)
def _cancel_download_watchable_list_request(self, reqid: int) -> None:
"""Cancel an in-progress watchable-list download identified by its request ID.
:param reqid: The request ID of the download to cancel.
:raises OperationFailure: If no download with that request ID is active.
"""
try:
req = self._pending_watchable_download_request[reqid]
except KeyError:
raise sdk.exceptions.OperationFailure(f"No download request identified by request ID : {reqid}")
req._mark_complete(success=False, failure_reason="Cancelled by user")
def _cancel_download_sfd_request(self, reqid: int) -> None:
"""Cancel an in-progress SFD download identified by its request ID.
:param reqid: The request ID of the download to cancel.
:raises OperationFailure: If no SFD download with that request ID is active.
"""
try:
req = self._pending_sfd_download_requests[reqid]
except KeyError:
raise sdk.exceptions.OperationFailure(f"No download request identified by request ID : {reqid}")
req._mark_complete(success=False, failure_reason="Cancelled by user")
def _cancel_upload_sfd_request(self, init_reqid: int) -> None:
"""Cancel an in-progress SFD upload identified by its initial request ID.
:param init_reqid: The request ID assigned at upload initialization.
:raises OperationFailure: If no SFD upload with that request ID is active.
"""
try:
req = self._pending_sfd_upload_requests[init_reqid]
except KeyError:
raise sdk.exceptions.OperationFailure(f"No upload request identified by request ID : {init_reqid}")
req._mark_complete(success=False, failure_reason="Cancelled by user")
def _start_upload_sfd(self, init_reqid: int) -> None:
"""Start sending data chunk to the server, called from the user thread
:raises OperationFailure: If no upload request with the given ``init_reqid`` is active.
"""
chunk_size = max(TCPClientHandler.STREAM_MTU // 2 - 100, 256)
if self._UNITTEST_DOWNLOAD_CHUNK_SIZE is not None:
chunk_size = self._UNITTEST_DOWNLOAD_CHUNK_SIZE
try:
req = self._pending_sfd_upload_requests[init_reqid]
except KeyError:
raise sdk.exceptions.OperationFailure(f"No upload request identified by request ID : {init_reqid}")
filesize = os.stat(req.abs_filepath).st_size
def upload_thread_fn() -> None:
try:
with open(req.abs_filepath, 'rb') as f:
chunk_index = 0
byte_counter = 0
is_last_chunk = False
while not is_last_chunk and not req.completed: # checking req.completed will exit on cancel or server error
data_chunk = f.read(chunk_size)
if len(data_chunk) == 0:
break
byte_counter += len(data_chunk)
is_last_chunk = (byte_counter >= filesize)
msg = cast(api_typing.C2S.UploadSFDData,
self._make_request(API.Command.Client2Api.UPLOAD_SFD_DATA, {
'token': req._upload_token,
'file_chunk': {
'chunk_index': chunk_index,
'data': b64encode(data_chunk).decode('ascii')
}
})
)
chunk_index += 1
reqid = msg['reqid']
with self._main_lock:
# Many reqid points to the same request object. it's fine. Makes the design cleaner and less convoluted
with tools.SuppressException(KeyError):
del self._pending_sfd_upload_requests[reqid]
self._logger.error(f"An upload request already existed for data request ID: {init_reqid}")
self._pending_sfd_upload_requests[reqid] = req
self._send(msg)
except Exception as e:
req._mark_complete(False, f"Error while uploading: {e}")
tools.log_exception(self._logger, e, "Error in SFD upload thread")
upload_thread = threading.Thread(target=upload_thread_fn, daemon=True)
upload_thread.start()
def _flush_batch_write(self, batch_write_context: BatchWriteContext) -> None:
"""Commit a completed batch by enqueuing a :class:`FlushPoint` sentinel followed by the :class:`BatchWriteContext`.
:param batch_write_context: The batch context to flush.
"""
self._enqueue_write_request(FlushPoint()) # Flush Point required because Python thread-safe queue has no peek() method.
self._enqueue_write_request(batch_write_context)
def _end_batch(self) -> None:
"""Clear the active batch context, marking the end of a batch-write session"""
self._active_batch_context = None
def _wait_write_batch_complete(self, batch: BatchWriteContext) -> None:
"""Block until every write request in the batch has completed or the batch timeout expires.
:param batch: The :class:`BatchWriteContext` to wait on.
:raises TimeoutException: If one or more write requests in the batch do not complete in time.
"""
start_time = time.monotonic()
incomplete_count: Optional[int] = None
try:
for write_request in batch.requests:
remaining_time = max(0, batch.timeout - (time.monotonic() - start_time))
write_request.wait_for_completion(timeout=remaining_time)
timed_out = False
except sdk.exceptions.TimeoutException:
timed_out = True
if timed_out:
incomplete_count = 0
for request in batch.requests:
if not request.completed:
incomplete_count += 1
if incomplete_count > 0:
raise sdk.exceptions.TimeoutException(
f"Incomplete batch write. {incomplete_count} write requests not completed in {batch.timeout} sec. ")
def _change_update_rate(self, handle: WatchableHandle, update_rate: Optional[float] = None) -> Optional[float]:
validation.assert_type(handle, 'handle', WatchableHandle)
update_rate = validation.assert_float_range_if_not_none(update_rate, 'update_rate', 1)
@dataclass
class MutableContainer:
effective_rate: Optional[float]
mutable_ref = MutableContainer(effective_rate=None)
def wt_change_callback(state: CallbackState, response: Optional[api_typing.S2CMessage]) -> None:
if response is not None and state == CallbackState.OK:
response = cast(api_typing.S2C.ChangeSubscriptionUpdateRate, response)
changes = api_parser.parse_change_update_rate_response(response)
if len(changes) != 1:
raise sdk.exceptions.BadResponseError(f"Expected 1 rate value in reposne, got {len(changes)}")
server_id, effective_rate = next(iter(changes.items()))
if server_id != handle.server_id:
raise sdk.exceptions.BadResponseError(f"Received an effective update rate for the wrong watchable.")
mutable_ref.effective_rate = effective_rate
req = self._make_request(API.Command.Client2Api.CHANGE_SUBSCRIPTION_UPDATE_RATE, {
'changes': [
{'id': handle.server_id, 'rate': update_rate} # Single element
],
})
future = self._send(req, wt_change_callback)
assert future is not None
future.wait()
if future.state != CallbackState.OK:
raise sdk.exceptions.OperationFailure(f"Failed to change the watchable update rate. {future.error_str}")
handle._set_requested_update_rate(update_rate) # Keep last value
return mutable_ref.effective_rate
# === User API ====
[docs]
def connect(self, hostname: str, port: int, wait_status: bool = True) -> "ScrutinyClient":
"""Connect to a Scrutiny server through a TCP socket.
:param hostname: The hostname or IP address of the server
:param port: The listening port of the server
:param wait_status: Wait for a server status update after the socket connection is established.
Ensure that a value is available when calling :meth:`get_latest_server_status()<get_latest_server_status>`
:raises ConnectionError: In case of failure
"""
self.disconnect()
self._locked_for_connect = True
self._connection_cancel_request = False
self._threading_events.welcome_received.clear()
with self._main_lock:
self._hostname = hostname
self._port = port
connect_error: Optional[Exception] = None
self._logger.debug(f"Connecting to {hostname}:{port}")
with self._sock_lock:
try:
self._server_state = ServerState.Connecting
self._stream_parser.reset()
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) # Disable Nagle's algorithm
self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, struct.pack('ii', 1, 0)) # Abortive close
self._selector = selectors.DefaultSelector()
self._selector.register(self._sock, selectors.EVENT_READ)
self._sock.connect((hostname, port))
self._trigger_event(self.Events.ConnectedEvent(self._hostname, self._port), loglevel=logging.INFO)
self._server_state = ServerState.Connected
self._start_worker_thread()
except socket.error as e:
# Connect may fail in many way.
# The user can close the socket to unblock the connection thread.
self._logger.debug(traceback.format_exc())
connect_error = e
self._locked_for_connect = False
if connect_error is not None:
self.disconnect()
raise sdk.exceptions.ConnectionError(f'Failed to connect to the server at "{hostname}:{port}". Error: {connect_error}')
self._threading_events.welcome_received.wait(self._timeout)
if not self._threading_events.welcome_received.is_set():
self.disconnect()
raise sdk.exceptions.TimeoutException(f'Did not receive a Welcome message from the server. Timeout={self._timeout}s')
if wait_status:
# Same logic as wait_server_status_update(), but without clearing the flag since we want at least 1 update.
timeout = self._UPDATE_SERVER_STATUS_INTERVAL + 2
self._threading_events.server_status_updated.wait(timeout=timeout)
if not self._threading_events.server_status_updated.is_set():
raise sdk.exceptions.TimeoutException(f"Server status did not update within a {timeout} seconds delay")
return self
[docs]
def disconnect(self) -> None:
"""Disconnect from the server"""
if self._worker_thread is None:
self._wt_disconnect() # Can call safely from this thread
return
if not self._worker_thread.is_alive():
self._wt_disconnect() # Can call safely from this thread
return
self.close_socket()
self._threading_events.disconnected.clear()
self._threading_events.disconnect.set()
self._threading_events.disconnected.wait(timeout=2) # Timeout avoid race condition if the thread was exiting
self._stop_worker_thread()
[docs]
def listen_events(self, enabled_events: int, disabled_events: int = 0) -> None:
"""Select which events are to be listen for when calling :meth:`read_event<read_event>`.
:param enabled_events: A flag value constructed by ORing values from :class:`ScrutinyClient.Events<scrutiny.sdk.client.ScrutinyClient.Events>`
:raises TypeError: Given parameter not of the expected type
:raises ValueError: If the flag value is negative
"""
validation.assert_int_range(enabled_events, 'enabled_events', minval=0)
self._enabled_events = enabled_events & (self.Events.LISTEN_ALL ^ disabled_events)
[docs]
def try_get_existing_watch_handle_by_server_id(self, server_id: str) -> Optional[WatchableHandle]:
"""Retrieve an existing watchable handle created after a call to :meth:`watch()<watch>` if it exists, identified by its unique server_id.
This methods makes no request to the server and is therefore non-blocking.
:param server_id: The server_id assigned to the handle returned by :meth:`watch()<watch>`
:raises TypeError: Given parameter not of the expected type
:return: A handle that can read/write the watched element or ``None`` if the element is not being watched.
"""
validation.assert_type(server_id, 'server_id', str)
handle: Optional[WatchableHandle] = None
with tools.SuppressException(KeyError):
handle = self._watchable_storage[server_id] # No need to lock. This is atomic
return handle
[docs]
def try_get_existing_watch_handle(self, path: str) -> Optional[WatchableHandle]:
"""Retrieve an existing watchable handle created after a call to :meth:`watch()<watch>` if it exists.
This methods makes no request to the server and is therefore non-blocking.
:param path: The path of the element being watched
:raises TypeError: Given parameter not of the expected type
:return: A handle that can read/write the watched element or ``None`` if the element is not being watched.
"""
validation.assert_type(path, 'path', str)
cached_watchable: Optional[WatchableHandle] = None
with self._main_lock:
if path in self._watchable_path_to_id_map:
server_id = self._watchable_path_to_id_map[path]
if server_id in self._watchable_storage:
cached_watchable = self._watchable_storage[server_id]
return cached_watchable
[docs]
def watch(self, path: str, update_rate: Optional[float] = None) -> WatchableHandle:
"""Starts watching a watchable element identified by its display path (tree-like path)
:param path: The path of the element to watch
:param update_rate: The update rate at which the server should update this value.
The actual rate may be faster if another client requires a higher update frequency
A value of ``None`` indicates that updates should occur as quickly as possible.
This rate applies to the server/device communication while :meth:`set_server_throttling<scrutiny.sdk.client.ScrutinyClient.set_server_throttling>`
controls the server/client communication
:raises OperationFailure: If the watch request fails to complete
:raises TypeError: Given parameter not of the expected type
:return: A handle that can read/write the watched element.
"""
validation.assert_type(path, 'path', str)
update_rate = validation.assert_float_range_if_not_none(update_rate, 'update_rate', minval=1)
cached_watchable = self.try_get_existing_watch_handle(path)
if cached_watchable:
return cached_watchable
watchable = WatchableHandle(self, path, update_rate)
def wt_subscribe_callback(state: CallbackState, response: Optional[api_typing.S2CMessage]) -> None:
if response is not None and state == CallbackState.OK:
response = cast(api_typing.S2C.SubscribeWatchable, response)
watchable_defs = api_parser.parse_subscribe_watchable_response(response)
if len(watchable_defs) != 1:
raise sdk.exceptions.BadResponseError(
f'The server did confirm the subscription of {len(watchable_defs)} while we requested only for 1')
if path not in watchable_defs:
raise sdk.exceptions.BadResponseError(
f'The server did not confirm the subscription for the right watchable. Got {list(watchable_defs.keys())[0]}, expected {path}')
watchable._configure(watchable_defs[path])
assert watchable._configuration is not None
with self._main_lock:
self._watchable_path_to_id_map[watchable.server_path] = watchable._configuration.server_id
self._watchable_storage[watchable._configuration.server_id] = watchable
req = self._make_request(API.Command.Client2Api.SUBSCRIBE_WATCHABLE, {
'watchables': [watchable.server_path], # Single element
'rate': update_rate
})
future = self._send(req, wt_subscribe_callback)
assert future is not None
future.wait()
if future.state != CallbackState.OK:
raise sdk.exceptions.OperationFailure(f"Failed to subscribe to the watchable. {future.error_str}")
watchable._assert_configured()
assert watchable._configuration is not None # To please mypy
if self._logger.isEnabledFor(logging.DEBUG):
self._logger.debug(f"Now watching {watchable.server_path} (Server ID={watchable._configuration.server_id})")
return watchable
def unwatch(self, watchable_ref: Union[str, WatchableHandle]) -> None:
"""Stop watching a watchable element. Marks the handle as "dead".
See :attr:`is_dead<scrutiny.sdk.watchable_handle.WatchableHandle.is_dead>`
:param watchable_ref: The tree-like path of the watchable element or the handle to it
:raises ValueError: If path is not valid
:raises TypeError: Given parameter not of the expected type
:raises NameNotFoundError: If the required path is not presently being watched
:raises OperationFailure: If the subscription cancellation failed in any way
"""
validation.assert_type(watchable_ref, 'watchable_ref', (str, WatchableHandle))
if isinstance(watchable_ref, WatchableHandle):
path = watchable_ref.server_path
else:
path = watchable_ref
watchable: Optional[WatchableHandle] = None
with self._main_lock:
if path in self._watchable_path_to_id_map:
server_id = self._watchable_path_to_id_map[path]
if server_id in self._watchable_storage:
watchable = self._watchable_storage[server_id]
if watchable is None:
raise sdk.exceptions.NameNotFoundError(f"Cannot unwatch {path} as it is not being watched.")
req = self._make_request(API.Command.Client2Api.UNSUBSCRIBE_WATCHABLE, {
'watchables': [
watchable.server_path
]
})
def wt_unsubscribe_callback(state: CallbackState, response: Optional[api_typing.S2CMessage]) -> None:
if response is not None and state == CallbackState.OK and watchable is not None:
response = cast(api_typing.S2C.UnsubscribeWatchable, response)
if len(response['unsubscribed']) != 1:
raise sdk.exceptions.BadResponseError(
f'The server did cancel the subscription of {len(response["unsubscribed"])} while we requested only for 1')
if response['unsubscribed'][0] != watchable.server_path:
raise sdk.exceptions.BadResponseError(
f'The server did not cancel the subscription for the right watchable. Got {response["unsubscribed"][0]}, expected {watchable.server_path}')
future = self._send(req, wt_unsubscribe_callback)
assert future is not None
error: Optional[Exception] = None
future.wait()
if future.state != CallbackState.OK:
raise sdk.exceptions.OperationFailure(f"Failed to unsubscribe to the watchable. {future.error_str}")
with self._main_lock:
if watchable.server_path in self._watchable_path_to_id_map:
del self._watchable_path_to_id_map[watchable.server_path]
if watchable._configuration is not None:
if watchable._configuration.server_id in self._watchable_storage:
del self._watchable_storage[watchable._configuration.server_id]
watchable._set_dead(ValueStatus.NotWatched)
if self._logger.isEnabledFor(logging.DEBUG):
self._logger.debug(f"Done watching {watchable.server_path}")
[docs]
def get_watchable_info(self, paths: List[str]) -> Dict[str, DetailedWatchableConfiguration]:
"""Request the server for details about a list of watchables.
The information returned is the same as one would have gotten after a call to :meth:`watch<scrutiny.sdk.client.ScrutinyClient.watch>`,
without actually subscribing to the server for updates.
:param paths: The server path of every watchable to query
:return: A dictionary mapping server path and detailed info structure
:raises ValueError: If paths is not valid
:raises TypeError: Given parameter not of the expected type
:raises OperationFailure: If the request fails in any way
"""
validation.assert_type(paths, 'paths', list)
for i in range(len(paths)):
validation.assert_type(paths[i], f'paths[{i}]', str)
@dataclass
class Container:
obj: Optional[Dict[str, DetailedWatchableConfiguration]]
container = Container(obj=None)
def wt_get_info_callback(state: CallbackState, response: Optional[api_typing.S2CMessage]) -> None:
if response is not None and state == CallbackState.OK:
response = cast(api_typing.S2C.GetWatchableInfo, response)
watchable_defs = api_parser.parse_get_watchable_info_response(response)
if len(watchable_defs) != len(paths):
raise sdk.exceptions.BadResponseError(
f'The server returned the info for {len(watchable_defs)} watchables while we requested only for {len(paths)}')
for path in paths:
if path not in watchable_defs:
raise sdk.exceptions.BadResponseError(f'The server did not return the information of {path}')
container.obj = watchable_defs
req = self._make_request(API.Command.Client2Api.GET_WATCHABLE_INFO, {
'watchables': paths
})
future = self._send(req, wt_get_info_callback)
assert future is not None
future.wait()
if future.state != CallbackState.OK or container.obj is None:
raise sdk.exceptions.OperationFailure(f"Failed to get the watchable info. {future.error_str}")
return container.obj
[docs]
def get_var_watchable_info(self, path: str) -> DetailedVarWatchableConfiguration:
""" Performs a call to :meth:`get_watchable_info<scrutiny.sdk.client.ScrutinyClient.get_watchable_info>` for a single watchable
of type :attr:`Variable<scrutiny.sdk.WatchableType.Variable>`.
:param path: Server path to the watchable
:raises TypeError: Given parameter not of the expected type
:raises ValueError: Given parameter has an invalid value
:raises OperationFailure: If the request fails in any way
:raises BadTypeError: If the requested watchable is not an variable
"""
d = self.get_watchable_info([path])
info = d[path]
if not isinstance(info, DetailedVarWatchableConfiguration):
raise sdk.exceptions.BadTypeError(f"Watchable {path} is not a variable. Got {info.watchable_type.name}")
return info
[docs]
def get_alias_watchable_info(self, path: str) -> DetailedAliasWatchableConfiguration:
""" Performs a call to :meth:`get_watchable_info<scrutiny.sdk.client.ScrutinyClient.get_watchable_info>` for a single watchable
of type :attr:`Alias<scrutiny.sdk.WatchableType.Alias>`.
:param path: Server path to the watchable
:raises TypeError: Given parameter not of the expected type
:raises ValueError: Given parameter has an invalid value
:raises OperationFailure: If the request fails in any way
:raises BadTypeError: If the requested watchable is not an alias
"""
d = self.get_watchable_info([path])
info = d[path]
if not isinstance(info, DetailedAliasWatchableConfiguration):
raise sdk.exceptions.BadTypeError(f"Watchable {path} is not an alias. Got {info.watchable_type.name}")
return info
[docs]
def get_rpv_watchable_info(self, path: str) -> DetailedRPVWatchableConfiguration:
""" Performs a call to :meth:`get_watchable_info<scrutiny.sdk.client.ScrutinyClient.get_watchable_info>` for a single watchable
of type :attr:`RPV<scrutiny.sdk.WatchableType.RuntimePublishedValue>`.
:param path: Server path to the watchable
:raises TypeError: Given parameter not of the expected type
:raises ValueError: Given parameter has an invalid value
:raises OperationFailure: If the request fails in any way
:raises BadTypeError: If the requested watchable is not a Runtime Published Value
"""
d = self.get_watchable_info([path])
info = d[path]
if not isinstance(info, DetailedRPVWatchableConfiguration):
raise sdk.exceptions.BadTypeError(f"Watchable {path} is not a RPV. Got {info.watchable_type.name}")
return info
[docs]
def wait_new_value_for_all(self, timeout: float = 5) -> None:
"""Wait for all watched elements to be updated at least once after the call to this method
:param timeout: Amount of time to wait for the update
:raises TypeError: Given parameter not of the expected type
:raises ValueError: Given parameter has an invalid value
:raises InvalidValueError: If a watched element becomes invalid while waiting
:raises TimeoutException: If not all watched elements gets updated in time
"""
timeout = validation.assert_float_range(timeout, 'timeout', minval=0)
counter_map: Dict[str, Optional[int]] = {}
with self._main_lock:
watchable_storage_copy = self._watchable_storage.copy() # Shallow copy
for server_id in watchable_storage_copy:
counter_map[server_id] = watchable_storage_copy[server_id]._update_counter
start_time = time.monotonic()
for server_id in watchable_storage_copy:
timeout_remainder = max(round(timeout - (time.monotonic() - start_time), 2), 0)
# Wait update will throw if the server has gone away as the _disconnect method will set all watchables "invalid"
watchable_storage_copy[server_id].wait_update(previous_counter=counter_map[server_id], timeout=timeout_remainder)
[docs]
def wait_server_status_update(self, timeout: float = _UPDATE_SERVER_STATUS_INTERVAL + 0.5) -> None:
"""Wait for the server to broadcast a status update. Happens periodically
:param timeout: Amount of time to wait for the update
:raises TypeError: Given parameter not of the expected type
:raises ValueError: Given parameter has an invalid value
:raises TimeoutException: Server status update did not occurred within the timeout time
"""
timeout = validation.assert_float_range(timeout, 'timeout', minval=0)
self._threading_events.server_status_updated.clear()
self._threading_events.server_status_updated.wait(timeout=timeout)
if not self._threading_events.server_status_updated.is_set():
raise sdk.exceptions.TimeoutException(f"Server status did not update within a {timeout} seconds delay")
[docs]
def request_server_status_update(self, wait: bool = False, timeout: Optional[float] = None) -> Optional[sdk.ServerInfo]:
"""Request the server with an immediate status update. Avoid waiting for the periodic request to be sent.
:param wait: Wait for the response if ``True``
:param timeout: Amount of time to wait for the update. Have no effect if ``wait=False``. Use the SDK default timeout if ``None``
:raises OperationFailure: Failed to get the server status
:raises TimeoutException: Server status update did not occurred within the timeout time
:return: The server status if ``wait=True``, ``None`` otherwise
"""
req = self._make_request(API.Command.Client2Api.GET_SERVER_STATUS)
self._send(req)
if wait:
kwargs = {}
if timeout is not None:
kwargs['timeout'] = timeout
self.wait_server_status_update(**kwargs)
return self.get_latest_server_status()
return None
[docs]
def wait_device_ready(self, timeout: float) -> None:
"""Wait for a device to be connected to the server and have finished its handshake.
:param timeout: Amount of time to wait for the device
:raises TypeError: Given parameter not of the expected type
:raises ValueError: Given parameter has an invalid value
:raises ConnectionError: If the connection to the server is lost while waiting
:raises InvalidValueError: If the watchable becomes invalid while waiting
:raises TimeoutException: If the device does not become ready within the required timeout
"""
timeout = validation.assert_float_range(timeout, 'timeout', minval=0)
t1 = time.perf_counter()
while True:
server_status = self.get_latest_server_status()
if server_status is not None:
if server_status.device_comm_state == sdk.DeviceCommState.ConnectedReady:
break
consumed_time = time.perf_counter() - t1
remaining_time = max(timeout - consumed_time, 0)
timed_out = False
try:
self.wait_server_status_update(remaining_time)
except sdk.exceptions.TimeoutException:
timed_out = True
if timed_out:
raise sdk.exceptions.TimeoutException(f'Device did not become ready within {timeout}s')
[docs]
def batch_write(self, timeout: Optional[float] = None) -> BatchWriteContext:
"""Starts a batch write. Write operations will be enqueued and committed together.
Every write is guaranteed to be executed in the right order
:param timeout: Amount of time to wait for the completion of the batch once committed. If ``None`` the default write timeout
will be used.
:raises TypeError: Given parameter not of the expected type
:raises ValueError: Given parameter has an invalid value
:raises OperationFailure: Failed to complete the batch write
"""
timeout = validation.assert_float_range_if_not_none(timeout, 'timeout', minval=0)
if self._active_batch_context is not None:
raise sdk.exceptions.OperationFailure("Batch write cannot be nested")
if timeout is None:
timeout = self._write_timeout
batch_context = BatchWriteContext(self, timeout)
self._active_batch_context = batch_context
return batch_context
[docs]
def get_installed_sfds(self) -> Dict[str, sdk.SFDInfo]:
"""Gets the list of Scrutiny Firmware Description file installed on the server
:raises OperationFailure: Failed to get the SFD list
:return: A dictionary mapping firmware IDs (hash) to a :class:`SFDInfo<scrutiny.sdk.SFDInfo>` structure
"""
req = self._make_request(API.Command.Client2Api.GET_INSTALLED_SFD)
@dataclass(slots=True)
class Container:
obj: Optional[Dict[str, sdk.SFDInfo]]
cb_data: Container = Container(obj=None) # Force pass by ref
def callback(state: CallbackState, response: Optional[api_typing.S2CMessage]) -> None:
if response is not None and state == CallbackState.OK:
cb_data.obj = api_parser.parse_get_installed_sfds_response(cast(api_typing.S2C.GetInstalledSFD, response))
future = self._send(req, callback)
assert future is not None
future.wait()
if future.state != CallbackState.OK or cb_data.obj is None:
raise sdk.exceptions.OperationFailure(
f"Failed to get the list of Scrutiny Firmware Description file installed on the server. {future.error_str}")
return cb_data.obj
[docs]
def uninstall_sfds(self, firmware_id_list: List[str]) -> None:
"""
Uninstall a list of Scrutiny Firmware Description (SFD) from the server.
:param firmware_id_list: The list of firmware ID. Should be an 128bits hexadecimal string (32 chars)
:raises OperationFailure: Failed to uninstall the given SFDs
:raises TypeError: Given parameter not of the expected type
"""
validation.assert_type(firmware_id_list, 'firmware_id_list', list)
for i in range(len(firmware_id_list)):
validation.assert_type(firmware_id_list[i], f'firmware_id_list[{i}]', str)
req = self._make_request(API.Command.Client2Api.UNINSTALL_SFD, {
'firmware_id_list': firmware_id_list
})
def callback(state: CallbackState, response: Optional[api_typing.S2CMessage]) -> None:
pass
future = self._send(req, callback) # Empty callback is necessary to get a future object.
assert future is not None
future.wait()
if future.state != CallbackState.OK:
raise sdk.exceptions.OperationFailure(
f"Failed to uninstall the list of Scrutiny Firmware Description file. {future.error_str}")
[docs]
def download_sfd(self, firmware_id: str) -> SFDDownloadRequest:
"""Download a Scrutiny Fiimware Description file from the server
:param firmware_id: A 32 char hex string that matches the wanted SFD firmware ID
:return: A handle on the request that gives the status of the download and can be waited on
:raises TypeError: Given parameter not of the expected type
"""
validation.assert_type(firmware_id, 'firmware_id', str)
req = cast(api_typing.C2S.DownloadSFD,
self._make_request(API.Command.Client2Api.DOWNLOAD_SFD, {
'firmware_id': firmware_id
})
)
# For unit tests, we want to validate that multi chunk works
if self._UNITTEST_DOWNLOAD_CHUNK_SIZE is not None:
req['max_chunk_size'] = self._UNITTEST_DOWNLOAD_CHUNK_SIZE
reqid = req['reqid']
pending_req = SFDDownloadRequest(self, firmware_id, reqid)
with self._main_lock:
self._pending_sfd_download_requests[reqid] = pending_req
self._send(req)
return pending_req
[docs]
def init_sfd_upload(self, filepath: Union[str, Path]) -> SFDUploadRequest:
"""
Initialize the upload and isntall process of a Scrutiny Firmware Description (SFD) file to the server.
Calling this method will not transfer the data: calling :meth:`SFDUploadRequest.start()<scrutiny.sdk.client.SFDUploadRequest.start>` on the returned
object is required to start the file transfer.
:param filepath: The path to the SFD file to upload and install
:return: A handle on the request that gives the status of the uplaod and can be waited on
:raises TypeError: Given parameter not of the expected type
:raises ValueError: Given file is invalid or too big
:raises FileNotFoundError: If ``filepath`` does not point to an existing file.
:raises OperationFailure: Failed to initialize the upload, the server may have denied it
"""
validation.assert_type(filepath, 'filepath', (str, Path))
filepath = Path(filepath)
if not os.path.isfile(filepath):
raise FileNotFoundError(f"File {filepath} does not exist")
try:
firmware_id = FirmwareDescription.read_firmware_id_from_sfd_file(str(filepath)).hex()
except Exception as e:
raise ValueError(f"Invalid SFD file. {e}")
filesize = os.stat(filepath).st_size
if filesize > API.SFD_MAX_UPLOAD_SIZE:
raise ValueError(f"File too big. Maximum size {API.SFD_MAX_UPLOAD_SIZE}")
@dataclass(slots=True)
class Container:
obj: Optional[api_parser.UploadSFDInitResponse]
cb_data = Container(None)
def upload_callback(state: CallbackState, response: Optional[api_typing.S2CMessage]) -> None:
if response is not None and state == CallbackState.OK:
cb_data.obj = api_parser.parse_upload_sfd_init_response(cast(api_typing.S2C.UploadSFDInit, response))
req = cast(api_typing.C2S.UploadSFDInit,
self._make_request(API.Command.Client2Api.UPLOAD_SFD_INIT, {
'firmware_id': firmware_id,
'total_size': filesize
})
)
future = self._send(req, upload_callback)
assert future is not None
future.wait()
if future.state != CallbackState.OK or cb_data.obj is None:
raise sdk.exceptions.OperationFailure(f"Failed to upload SFD. {future.error_str}")
init_reqid = req['reqid']
upload_request = SFDUploadRequest(
client=self,
firmware_id=firmware_id,
upload_token=cb_data.obj.token,
will_overwrite=cb_data.obj.will_overwrite,
filepath=filepath,
init_reqid=init_reqid
)
with tools.SuppressException(KeyError):
del self._pending_sfd_upload_requests[init_reqid]
self._logger.error(f"An upload request already existed for init request ID: {init_reqid}")
with self._main_lock:
self._pending_sfd_upload_requests[init_reqid] = upload_request
return upload_request
def wait_process(self, timeout: Optional[float] = None) -> None:
"""Wait for the SDK thread to execute fully at least once. Useful for testing
:param timeout: Amount of time to wait for the completion of the thread loops. If ``None`` the default timeout will be used.
:raises TimeoutException: Worker thread does not complete a full loop within the given timeout
"""
timeout = validation.assert_float_range_if_not_none(timeout, 'timeout', minval=0)
if timeout is None:
timeout = self._timeout
self._threading_events.sync_complete.clear()
self._threading_events.require_sync.set()
self._threading_events.sync_complete.wait(timeout=timeout)
if not self._threading_events.sync_complete.is_set():
raise sdk.exceptions.TimeoutException(f"Worker thread did not complete a full loop within the {timeout} seconds.")
[docs]
def read_memory(self, address: int, size: int, timeout: Optional[float] = None) -> bytes:
"""Read the device memory synchronously.
:param address: The start address of the region to read
:param size: The size of the region to read, in bytes.
:param timeout: Maximum amount of time to wait to get the data back. If ``None``, the default timeout value will be used
:raises TypeError: Given parameter not of the expected type
:raises ValueError: Given parameter has an invalid value
:raises OperationFailure: Failed to complete the reading
:raises TimeoutException: If the read operation does not complete within the given timeout value
"""
validation.assert_int_range(address, 'address', minval=0)
validation.assert_int_range(size, 'size', minval=1)
timeout = validation.assert_float_range_if_not_none(timeout, 'timeout', minval=0)
time_start = time.monotonic()
if timeout is None:
timeout = self._timeout
req = self._make_request(API.Command.Client2Api.READ_MEMORY, {
'address': address,
'size': size
})
@dataclass(slots=True)
class Container:
obj: Optional[str]
cb_data: Container = Container(obj=None) # Force pass by ref
def callback(state: CallbackState, response: Optional[api_typing.S2CMessage]) -> None:
if response is not None and state == CallbackState.OK:
response = cast(api_typing.S2C.ReadMemory, response)
if 'request_token' not in response:
raise sdk.exceptions.BadResponseError('Missing request token in response')
cb_data.obj = response['request_token']
future = self._send(req, callback, timeout)
assert future is not None
future.wait()
if future.state != CallbackState.OK or cb_data.obj is None:
raise sdk.exceptions.OperationFailure(f"Failed to read the device memory. {future.error_str}")
remaining_time = max(0, timeout - (time.monotonic() - time_start))
request_token = cb_data.obj
t = time.perf_counter()
# No lock here because we have a 1 producer, 1 consumer scenario and we are waiting. We don't write
while request_token not in self._memory_read_completion_dict:
if time.perf_counter() - t >= remaining_time:
break
time.sleep(0.002)
with self._main_lock:
if request_token not in self._memory_read_completion_dict:
raise sdk.exceptions.TimeoutException(
"Did not get memory read result after %0.2f seconds. (address=0x%08X, size=%d)" % (timeout, address, size))
completion = self._memory_read_completion_dict[request_token]
del self._memory_read_completion_dict[request_token]
if not completion.success or completion.data is None:
raise sdk.exceptions.OperationFailure(f"Failed to read the device memory. {completion.error}")
return completion.data
[docs]
def write_memory(self, address: int, data: bytes, timeout: Optional[float] = None) -> None:
"""Write the device memory synchronously. This method will exit once the write is completed otherwise will throw an exception in case of failure
:param address: The start address of the region to read
:param data: The data to write
:param timeout: Maximum amount of time to wait to get the write completion confirmation. If ``None``, the default write timeout value will be used
:raises TypeError: Given parameter not of the expected type
:raises ValueError: Given parameter has an invalid value
:raises OperationFailure: Failed to complete the reading
:raises TimeoutException: If the read operation does not complete within the given timeout value
"""
validation.assert_int_range(address, 'address', minval=0)
validation.assert_type(data, 'data', bytes)
timeout = validation.assert_float_range_if_not_none(timeout, 'timeout', minval=0)
time_start = time.monotonic()
if timeout is None:
timeout = self._timeout
req = self._make_request(API.Command.Client2Api.WRITE_MEMORY, {
'address': address,
'data': b64encode(data).decode('ascii')
})
@dataclass(slots=True)
class Container:
obj: Optional[str]
cb_data: Container = Container(obj=None) # Force pass by ref
def callback(state: CallbackState, response: Optional[api_typing.S2CMessage]) -> None:
if response is not None and state == CallbackState.OK:
response = cast(api_typing.S2C.WriteMemory, response)
if 'request_token' not in response:
raise sdk.exceptions.BadResponseError('Missing request token in response')
cb_data.obj = response['request_token']
future = self._send(req, callback, timeout)
assert future is not None
future.wait()
if future.state != CallbackState.OK or cb_data.obj is None:
raise sdk.exceptions.OperationFailure(f"Failed to write the device memory. {future.error_str}")
remaining_time = max(0, timeout - (time.monotonic() - time_start))
request_token = cb_data.obj
t = time.perf_counter()
# No lock here because we have a 1 producer, 1 consumer scenario and are waiting. We don't write
while request_token not in self._memory_write_completion_dict:
if (time.perf_counter() - t) >= remaining_time:
break
time.sleep(0.002)
with self._main_lock:
if request_token not in self._memory_write_completion_dict:
raise sdk.exceptions.OperationFailure(
"Did not get memory write completion confirmation after %0.2f seconds. (address=0x%08X, size=%d)" % (timeout, address, len(data)))
completion = self._memory_write_completion_dict[request_token]
del self._memory_write_completion_dict[request_token]
if not completion.success:
raise sdk.exceptions.OperationFailure(f"Failed to write the device memory. {completion.error}")
[docs]
def read_datalogging_acquisition(self, reference_id: str, timeout: Optional[float] = None) -> sdk.datalogging.DataloggingAcquisition:
"""Reads a datalogging acquisition from the server storage identified by its reference ID
:param reference_id: The acquisition unique ID
:param timeout: The request timeout value. The default client timeout will be used if set to ``None`` Defaults to ``None``
:raises TypeError: Given parameter not of the expected type
:raises ValueError: Given parameter has an invalid value
:raises OperationFailure: If fetching the acquisition fails
:return: An object containing the acquisition, including the data, the axes, the trigger index, the graph name, etc
"""
validation.assert_type(reference_id, 'reference_id', str)
timeout = validation.assert_float_range_if_not_none(timeout, 'timeout', minval=0)
if timeout is None:
timeout = self._timeout
req = self._make_request(API.Command.Client2Api.READ_DATALOGGING_ACQUISITION_CONTENT, {
'reference_id': reference_id
})
@dataclass(slots=True)
class Container:
obj: Optional[sdk.datalogging.DataloggingAcquisition]
cb_data: Container = Container(obj=None) # Force pass by ref
def callback(state: CallbackState, response: Optional[api_typing.S2CMessage]) -> None:
if response is not None and state == CallbackState.OK:
cb_data.obj = api_parser.parse_read_datalogging_acquisition_content_response(
cast(api_typing.S2C.ReadDataloggingAcquisitionContent, response)
)
future = self._send(req, callback)
assert future is not None
future.wait(timeout)
if future.state != CallbackState.OK:
raise sdk.exceptions.OperationFailure(
f"Failed to read the datalogging acquisition with reference ID '{reference_id}'. {future.error_str}")
assert cb_data.obj is not None
acquisition = cb_data.obj
return acquisition
[docs]
def start_datalog(self, config: sdk.datalogging.DataloggingConfig) -> sdk.datalogging.DataloggingRequest:
"""Requires the device to make a datalogging acquisition based on the given configuration
:param config: The datalogging configuration including sampling rate, signals to log, trigger condition and operands, etc.
:raises OperationFailure: If the request to the server fails
:raises ValueError: Bad parameter value
:raises TypeError: Given parameter not of the expected type
:return: A `DataloggingRequest` handle that can provide the status of the acquisition process and used to fetch the data.
"""
validation.assert_type(config, 'config', sdk.datalogging.DataloggingConfig)
req_data: api_typing.C2S.RequestDataloggingAcquisition = {
'cmd': "", # Will be overridden
"reqid": 0, # Will be overridden
'condition': config._trigger_condition.value,
'sampling_rate_id': config._sampling_rate,
'decimation': config._decimation,
'name': config._name,
'timeout': config._timeout,
'trigger_hold_time': config._trigger_hold_time,
'probe_location': config._trigger_position,
'x_axis_type': config._x_axis_type.value,
'x_axis_signal': config._get_api_x_axis_signal(),
'yaxes': config._get_api_yaxes(),
'operands': config._get_api_trigger_operands(),
'signals': config._get_api_signals(),
}
req = self._make_request(API.Command.Client2Api.REQUEST_DATALOGGING_ACQUISITION, cast(Dict[str, Any], req_data))
@dataclass(slots=True)
class Container:
request: Optional[sdk.datalogging.DataloggingRequest]
cb_data: Container = Container(request=None) # Force pass by ref
def callback(state: CallbackState, response: Optional[api_typing.S2CMessage]) -> None:
if response is not None and state == CallbackState.OK:
request_token = api_parser.parse_request_datalogging_acquisition_response(
cast(api_typing.S2C.RequestDataloggingAcquisition, response)
)
cb_data.request = sdk.datalogging.DataloggingRequest(client=self, request_token=request_token)
self._pending_datalogging_requests[request_token] = cb_data.request
future = self._send(req, callback)
assert future is not None
future.wait()
if future.state != CallbackState.OK:
raise sdk.exceptions.OperationFailure(
f"Failed to request the datalogging acquisition'. {future.error_str}")
assert cb_data.request is not None
return cb_data.request
[docs]
def list_stored_datalogging_acquisitions(self,
firmware_id: Optional[str] = None,
before_datetime: Optional[datetime] = None,
count: int = 500,
timeout: Optional[float] = None) -> List[sdk.datalogging.DataloggingStorageEntry]:
"""Gets the list of datalogging acquisitions stored in the server database.
Acquisitions are returned ordered by acquisition time, from newest to oldest.
:param firmware_id: When not ``None``, searches for acquisitions taken with this firmware ID
:param before_datetime: An optional upper limit for the acquisition time. Will download acquisition taken before this datetime. Meant ot be used for UI lazy-loading
:param count: Maximum number of acquisition to fetch. Upper limit is 10000
:param timeout: The request timeout value. The default client timeout will be used if set to ``None`` Defaults to ``None``
:raises TypeError: Given parameter not of the expected type
:raises ValueError: Given parameter has an invalid value
:raises OperationFailure: If fetching the list fails
:return: A list of datalogging storage entries with acquisition metadata in them.
"""
validation.assert_type(firmware_id, 'firmware_id', (str, type(None)))
timeout = validation.assert_float_range_if_not_none(timeout, 'timeout', minval=0)
if before_datetime is not None:
validation.assert_type(before_datetime, 'before_datetime', datetime)
count = validation.assert_int_range(count, 'count', minval=0, maxval=10000)
if timeout is None:
timeout = self._timeout
data: Dict[str, Any] = {
'firmware_id': firmware_id,
'count': count,
'before_timestamp': None
}
if before_datetime is not None:
data['before_timestamp'] = before_datetime.timestamp()
req = self._make_request(API.Command.Client2Api.LIST_DATALOGGING_ACQUISITION, data)
@dataclass(slots=True)
class Container:
obj: Optional[List[sdk.datalogging.DataloggingStorageEntry]]
cb_data: Container = Container(obj=None) # Force pass by ref
def callback(state: CallbackState, response: Optional[api_typing.S2CMessage]) -> None:
if response is not None and state == CallbackState.OK:
cb_data.obj = api_parser.parse_list_datalogging_acquisitions_response(
cast(api_typing.S2C.ListDataloggingAcquisition, response)
)
future = self._send(req, callback)
assert future is not None
future.wait(timeout)
if future.state != CallbackState.OK:
raise sdk.exceptions.OperationFailure(
f"Failed to read the datalogging acquisition list from the server database. {future.error_str}")
assert cb_data.obj is not None
return cb_data.obj
[docs]
def user_command(self, subfunction: int, data: bytes = bytes()) -> sdk.UserCommandResponse:
"""
Sends a UserCommand request to the device with the given subfunction and data. UserCommand is a request that calls a user defined callback
in the device firmware. It allows a developer to take advantage of the scrutiny protocol to communicate non-scrutiny data with its device.
:param subfunction: Subfunction of the request. From 0x0 to 0x7F
:param data: The payload to send to the device
:raises ValueError: Bad parameter value
:raises TypeError: Given parameter not of the expected type
:raises OperationFailure: If the command completion fails
"""
validation.assert_int_range(subfunction, 'subfunction', 0, 0xFF)
validation.assert_type(data, 'data', bytes)
req = self._make_request(API.Command.Client2Api.USER_COMMAND, {
'subfunction': subfunction,
'data': b64encode(data).decode('utf8')
})
@dataclass(slots=True)
class Container:
obj: Optional[sdk.UserCommandResponse]
cb_data: Container = Container(obj=None) # Force pass by ref
def wt_user_command_callback(state: CallbackState, response: Optional[api_typing.S2CMessage]) -> None:
if response is not None and state == CallbackState.OK:
response = cast(api_typing.S2C.UserCommand, response)
cb_data.obj = api_parser.parse_user_command_response(response)
future = self._send(req, wt_user_command_callback)
assert future is not None
future.wait()
if future.state != CallbackState.OK or cb_data.obj is None:
raise sdk.exceptions.OperationFailure(f"Failed to request the device UserCommand. {future.error_str}")
return cb_data.obj
[docs]
def get_latest_server_status(self) -> ServerInfo:
"""Returns an immutable structure of data containing the latest server status that has been broadcast.
This makes no request to the server, it simply returns the latest value. See :meth:`request_server_status_update<request_server_status_update>`
to fetch a new status update from the server
:raises ConnectionError: If the connection to the server is lost
:raises InvalidValueError: If the server status is not available (never received it).
"""
if self._locked_for_connect: # Avoid blocking
raise sdk.exceptions.ConnectionError(f"Disconnected from server")
with self._main_lock:
if not self._server_state == ServerState.Connected:
raise sdk.exceptions.ConnectionError(f"Disconnected from server")
info = self._server_info
if info is None:
raise sdk.exceptions.InvalidValueError("Server status is not available")
# server_info is readonly and only its reference gets changed when updated.
# We can safely return a reference here. The user can't mess it up
return info
[docs]
def get_device_info(self) -> Optional[sdk.DeviceInfo]:
"""Gets all the available details about the device.
This information includes device id, name, communication parameters, special memory regions, datalogging details, available sampling rates, etc.
:raises OperationFailure: If the request to the server fails
:return: The device informations or ``None`` if not device is connected
"""
req = self._make_request(API.Command.Client2Api.GET_DEVICE_INFO)
@dataclass(slots=True)
class Container:
obj: Optional[sdk.DeviceInfo]
cb_data: Container = Container(obj=None) # Force pass by ref
def callback(state: CallbackState, response: Optional[api_typing.S2CMessage]) -> None:
if response is not None and state == CallbackState.OK:
cb_data.obj = api_parser.parse_get_device_info(cast(api_typing.S2C.GetDeviceInfo, response))
future = self._send(req, callback)
assert future is not None
future.wait()
if future.state != CallbackState.OK:
raise sdk.exceptions.OperationFailure(f"Failed to read the device information. {future.error_str}")
return cb_data.obj
def get_loaded_sfd(self) -> Optional[sdk.SFDInfo]:
"""
Reads the details of the Scrutiny Firmware Description loaded on the server side.
This information includes the firmware ID and the SFD metadata such as project name, project version, author, etc..
:raises OperationFailure: If the request to the server fails
:return: The loaded SFD details or ``None`` if no SFD is loaded on the server
"""
req = self._make_request(API.Command.Client2Api.GET_LOADED_SFD)
@dataclass(slots=True)
class Container:
obj: Optional[sdk.SFDInfo]
cb_data: Container = Container(obj=None) # Force pass by ref
def callback(state: CallbackState, response: Optional[api_typing.S2CMessage]) -> None:
if response is not None and state == CallbackState.OK:
cb_data.obj = api_parser.parse_get_loaded_sfd(cast(api_typing.S2C.GetLoadedSFD, response))
future = self._send(req, callback)
assert future is not None
future.wait()
if future.state != CallbackState.OK:
raise sdk.exceptions.OperationFailure(f"Failed to read the device information. {future.error_str}")
return cb_data.obj
[docs]
def register_listener(self, listener: listeners.BaseListener) -> None:
"""Register a new listener. The client will notify it each time a new value update is received from the server
:param listener: The listener to register
"""
with self._main_lock:
self._listeners.append(listener)
[docs]
def get_watchable_count(self) -> Dict[ServerDatastoreContentType, int]:
"""
Request the server with the number of available watchable items, organized per type
:raises ValueError: Bad parameter value
:raises TypeError: Given parameter not of the expected type
:raises OperationFailure: If the command completion fails
:return: A dictionary containing the number of watchables, classified by type
"""
req = self._make_request(API.Command.Client2Api.GET_WATCHABLE_COUNT)
@dataclass(slots=True)
class Container:
obj: Optional[Dict[sdk.ServerDatastoreContentType, int]]
cb_data: Container = Container(obj=None) # Force pass by ref
def wt_get_watchable_count_callback(state: CallbackState, response: Optional[api_typing.S2CMessage]) -> None:
if response is not None and state == CallbackState.OK:
response = cast(api_typing.S2C.GetWatchableCount, response)
cb_data.obj = api_parser.parse_get_watchable_count(response)
future = self._send(req, wt_get_watchable_count_callback)
assert future is not None
future.wait()
if future.state != CallbackState.OK or cb_data.obj is None:
raise sdk.exceptions.OperationFailure(f"Failed to request the available watchable count. {future.error_str}")
return cb_data.obj
[docs]
def download_watchable_list(self,
types: Optional[List[WatchableType]] = None,
max_per_response: int = 500,
name_patterns: List[str] = [],
partial_reception_callback: Optional[Callable[[sdk.WatchableListContentPart, bool], None]] = None
) -> WatchableListDownloadRequest:
"""
Request the server for the list of watchable items available in its datastore.
The returned data includes the path to the watchable and the properties that are common to all types of watchable (data type and enum)
More information might be downlaoded from a watchable by either calling :meth:`watch<scrutiny.sdk.client.ScrutinyClient.watch>`
or :meth:`get_watchable_info<scrutiny.sdk.client.ScrutinyClient.get_watchable_info>`
:param types: List of types to download. All of them if ``None``
:param max_per_response: Maximum number of watchable per datagram sent by the server.
:param name_patterns: List of name filters in the form of a glob string. Any watchable with a path that matches at least one name filter will be returned.
All watchables are returned if ``None``
:param partial_reception_callback: A callback to be called by the client whenever new data is received by the server. Data might be segmented
in several parts. Expected signature : ``my_callback(data, last_segment)`` where ``data`` is a dictionary of the form ``data.<type>[path] = watchable``
and ``last_segment`` indicate if that data segment was the last one.
If ``None`` is given, the received data will be stored inside the request object and can be fetched once the request has completed by
calling :meth:`get()<scrutiny.sdk.client.WatchableListDownloadRequest.get>`
**Note** This callback is called from an internal thread.
:raises ValueError: Bad parameter value
:raises TypeError: Given parameter not of the expected type
:raises ConnectionError: If the connection to the server is broken
:return: A handle to the request object that can be used for synchronization (:meth:`wait_for_completion<scrutiny.sdk.client.WatchableListDownloadRequest.wait_for_completion>`)
or cancel the request (:meth:`cancel<scrutiny.sdk.client.WatchableListDownloadRequest.cancel>`)
"""
validation.assert_type(max_per_response, 'max_per_response', int)
if types is None:
types = [WatchableType.Alias, WatchableType.RuntimePublishedValue, WatchableType.Variable]
validation.assert_type(types, 'types', list)
for type in types:
validation.assert_type(type, 'types', WatchableType)
if type not in WatchableType.all():
raise ValueError(f"Watchable type {type} is not a valid type to download")
validation.assert_type(name_patterns, 'name_patterns', list)
for name_pattern in name_patterns:
validation.assert_type(name_pattern, 'name_pattern', str)
watchable_type_names = {
WatchableType.Alias: "alias",
WatchableType.RuntimePublishedValue: "rpv",
WatchableType.Variable: "var",
}
filter_dict = {
"type": [watchable_type_names[wt] for wt in types]
}
if len(name_patterns) > 0:
filter_dict['name'] = name_patterns
req = self._make_request(API.Command.Client2Api.GET_WATCHABLE_LIST, {
'max_per_response': max_per_response,
'filter': filter_dict
})
request_handle = WatchableListDownloadRequest(client=self, request_id=req['reqid'], new_data_callback=partial_reception_callback)
with self._main_lock:
self._pending_watchable_download_request[req['reqid']] = request_handle
self._send(req)
# responses will be caught by the worker thread and the request handle will be updated
# using the response request_id echo.
return request_handle
[docs]
def clear_datalogging_storage(self) -> None:
"""Delete all datalogging acquisition stored on the server.
This action is irreversible
:raises OperationFailure: If the request to the server fails
"""
req = self._make_request(API.Command.Client2Api.DELETE_ALL_DATALOGGING_ACQUISITION)
def callback(state: CallbackState, response: Optional[api_typing.S2CMessage]) -> None:
pass
future = self._send(req, callback)
assert future is not None
future.wait()
if future.state != CallbackState.OK:
raise sdk.exceptions.OperationFailure(f"Failed to clear the datalogging storage. {future.error_str}")
[docs]
def delete_datalogging_acquisition(self, reference_id: str) -> None:
"""Delete a single datalogging acquisition stored on the server.
This action is irreversible
:param reference_id: The unique ``reference_id`` of the acquisition to delete.
:raises OperationFailure: If the request to the server fails
:raises TypeError: Given parameter not of the expected type
"""
validation.assert_type(reference_id, 'reference_id', str)
req = self._make_request(API.Command.Client2Api.DELETE_DATALOGGING_ACQUISITION, {
'reference_id': reference_id
})
def callback(state: CallbackState, response: Optional[api_typing.S2CMessage]) -> None:
pass
future = self._send(req, callback)
assert future is not None
future.wait()
if future.state != CallbackState.OK:
raise sdk.exceptions.OperationFailure(
f"Failed to delete the datalogging acquisition with reference ID: {reference_id}. {future.error_str}")
[docs]
def update_datalogging_acquisition(self, reference_id: str, name: str) -> None:
"""Update a single datalogging acquisition stored on the server.
:param reference_id: The unique ``reference_id`` of the acquisition to delete.
:param name: New name for the acquisition.
:raises OperationFailure: If the request to the server fails
:raises TypeError: Given parameter not of the expected type
"""
validation.assert_type(reference_id, 'reference_id', str)
validation.assert_type(name, 'name', str)
req = self._make_request(API.Command.Client2Api.UPDATE_DATALOGGING_ACQUISITION, {
'reference_id': reference_id,
'name': name
})
def callback(state: CallbackState, response: Optional[api_typing.S2CMessage]) -> None:
pass
future = self._send(req, callback)
assert future is not None
future.wait()
if future.state != CallbackState.OK:
raise sdk.exceptions.OperationFailure(
f"Failed to update the datalogging acquisition with reference ID: {reference_id}. {future.error_str}")
[docs]
def request_demo_mode(self, enable: bool) -> None:
"""Enable or disable the server demo mode
:param enable: Enable the demo mode when ``True``. Disable it when ``False``
:raises TypeError: Given parameter not of the expected type
:raises OperationFailure: If the request to the server fails
"""
validation.assert_type(enable, 'enable', bool)
req = cast(api_typing.C2S.DemoMode,
self._make_request(API.Command.Client2Api.DEMO_MODE, {
'enable': enable
})
)
def callback(state: CallbackState, response: Optional[api_typing.S2CMessage]) -> None:
pass
future = self._send(req, callback)
assert future is not None
future.wait()
if future.state != CallbackState.OK:
raise sdk.exceptions.OperationFailure("Failed to put the server in demo mode")
def has_event_pending(self) -> bool:
"""Return ``True`` if there is at least one unread event in the event queue"""
return not self._event_queue.empty()
[docs]
def read_event(self, timeout: Optional[float] = None) -> Optional[Events._ANY_EVENTS]:
"""
Read an event from the event queue using a blocking read operation
:param timeout: Maximum amount of time to block. Blocks indefinitely if ``None``
:return: The next event in the queue or ``None`` if there is no events after timeout is expired
"""
try:
return self._event_queue.get(block=True, timeout=timeout)
except queue.Empty:
return None
[docs]
def clear_event_queue(self) -> None:
"""Delete all pending events inside the event queue"""
self._event_queue.deplete()
[docs]
def get_local_stats(self) -> Statistics:
"""Return internal performance metrics"""
return self.Statistics(
rx_data_rate=self._datarate_measurements.rx_data_rate.get_value(),
rx_message_rate=self._datarate_measurements.rx_message_rate.get_value(),
tx_data_rate=self._datarate_measurements.tx_data_rate.get_value(),
tx_message_rate=self._datarate_measurements.tx_message_rate.get_value()
)
def reset_local_stats(self) -> None:
"""Reset all performance metrics that are resettable (have an internal state)"""
self._datarate_measurements.reset()
[docs]
def get_server_stats(self, timeout: Optional[float] = None) -> sdk.ServerStatistics:
"""Fetch internal performance statistics from the server.
:param timeout: Maximum time to wait for the response. Uses the default client timeout if ``None``.
:raises OperationFailure: If the request fails or the server returns an error.
:returns: A :class:`ServerStatistics<scrutiny.sdk.ServerStatistics>` object.
"""
if timeout is None:
timeout = self._timeout
req = self._make_request(API.Command.Client2Api.GET_SERVER_STATS)
@dataclass(slots=True)
class Container:
obj: Optional[sdk.ServerStatistics]
cb_data: Container = Container(obj=None) # Force pass by ref
def callback(state: CallbackState, response: Optional[api_typing.S2CMessage]) -> None:
if response is not None and state == CallbackState.OK:
cb_data.obj = api_parser.parser_server_stats(
cast(api_typing.S2C.GetServerStats, response)
)
future = self._send(req, callback)
assert future is not None
future.wait(timeout)
if future.state != CallbackState.OK:
raise sdk.exceptions.OperationFailure(
f"Failed to read the server statistics. {future.error_str}")
assert cb_data.obj is not None
stats = cb_data.obj
return stats
[docs]
def set_server_throttling(self, max_update_rate: float) -> None:
""" Request the server to throttle the stream of update to a maximum value.
:param max_update_rate: The maximum update rate in update/sec. A value of ``0`` disable the throttling.
:raises TypeError: Given parameter not of the expected type
:raises OperationFailure: If the request to the server fails
"""
validation.assert_float_range(max_update_rate, 'max_update_rate', minval=0)
req = cast(api_typing.C2S.SetThrottling,
self._make_request(API.Command.Client2Api.SET_THROTTLING, {
'update_rate': max_update_rate
})
)
def callback(state: CallbackState, response: Optional[api_typing.S2CMessage]) -> None:
pass
future = self._send(req, callback)
assert future is not None
future.wait(self._timeout)
if future.state != CallbackState.OK:
raise sdk.exceptions.OperationFailure(f"Failed to configure the server throttling. {future.error_str}")
def write_watchable(self, server_path: str, value: Union[int, float, bool, str]) -> None:
validation.assert_type(server_path, 'server_path', str)
validation.assert_type(value, 'value', (int, float, bool, str))
req = cast(api_typing.C2S.WriteSingleWatchable,
self._make_request(API.Command.Client2Api.WRITE_SINGLE_WATCHABLE, {
'server_path': server_path,
'value': value,
})
)
def callback(state: CallbackState, response: Optional[api_typing.S2CMessage]) -> None:
pass
future = self._send(req, callback)
assert future is not None
future.wait(self._timeout)
if future.state != CallbackState.OK:
raise sdk.exceptions.OperationFailure(f"Failed to write the watchable {server_path}. {future.error_str}")
@property
def logger(self) -> logging.Logger:
"""The python logger used by the Client"""
return self._logger
@property
def name(self) -> str:
"""Optional human-readable name given to this client instance. Returns an empty string if not set"""
return '' if self._name is None else self._name
@property
def server_state(self) -> ServerState:
"""The server communication state"""
with self._user_lock:
return ServerState(self._server_state) # Make a copy
@property
def hostname(self) -> Optional[str]:
"""Hostname of the server"""
with self._user_lock:
return str(self._hostname) if self._hostname is not None else None
@property
def port(self) -> Optional[int]:
"""Port of the the server is listening to"""
with self._user_lock:
return int(self._port) if self._port is not None else None