# watchable_handle.py
# A handle on a watchable element (Variable, Alias, RPV). This handle is created by
# the client when watching
#
# - License : MIT - See LICENSE file
# - Project : Scrutiny Debugger (github.com/scrutinydebugger/scrutiny-main)
#
# Copyright (c) 2023 Scrutiny Debugger
__all__ = ['WatchableHandle']
import threading
from datetime import datetime
import time
from scrutiny.sdk.definitions import *
from scrutiny.core.basic_types import *
from scrutiny.core.embedded_enum import EmbeddedEnum
import scrutiny.sdk.exceptions as sdk_exceptions
from scrutiny.sdk.write_request import WriteRequest
from scrutiny.tools import validation, deprecated
from scrutiny.tools.typing import *
from scrutiny.core import path_tools
if TYPE_CHECKING:
from scrutiny.sdk.client import ScrutinyClient
ValType = Union[int, float, bool]
[docs]
class WatchableHandle:
"""A handle to a server watchable element (Variable / Alias / RuntimePublishedValue) that gets updated by the client thread."""
__slots__ = (
'_client',
'_server_path',
'_shortname',
'_configuration',
'_lock',
'_status',
'_value',
'_last_value_dt',
'_last_write_dt',
'_update_counter',
'_dead',
'_requested_update_rate'
)
_client: "ScrutinyClient"
"""The client that created this handle"""
_server_path: str
"""The tree-like path known to the server"""
_shortname: str
"""Name of the last element in the server path"""
_configuration: Optional[BaseDetailedWatchableConfiguration]
"""Details obtained by the server after a call to watch()"""
_lock: threading.Lock
"""A lock to access the value"""
_status: ValueStatus
"""Status of the value. Tells if the value is valid or not and why it is invalid if not"""
_value: Optional[ValType]
"""Contains the latest value gotten by the client"""
_last_value_dt: Optional[datetime]
"""Datetime of the last value update by the client"""
_last_write_dt: Optional[datetime]
"""Datetime of the last completed write on this element"""
_update_counter: int
"""A counter that gets incremented each time the value is updated"""
_dead: bool
"""A one-shot flag that indicates if the handle is dead forever"""
_requested_update_rate: Optional[float]
"""The update rate requested for this handle"""
def __init__(self, client: "ScrutinyClient", server_path: str, requested_update_rate: Optional[float]) -> None:
self._client = client
self._server_path = server_path
self._shortname = path_tools.make_segments(server_path)[-1]
self._configuration = None
self._lock = threading.Lock()
self._update_counter = 0
self._last_write_dt = None
self._dead = False
self._set_invalid(ValueStatus.NeverSet)
self._requested_update_rate = requested_update_rate
def _set_requested_update_rate(self, update_rate: Optional[float]) -> None:
self._requested_update_rate = update_rate
def __repr__(self) -> str:
"""Return a developer-friendly string representation including the name, datatype, and object address."""
addr = "0x%0.8x" % id(self)
if self._configuration is None:
return f'<{self.__class__.__name__} "{self._shortname}" [Unconfigured] at {addr}>'
return f'<{self.__class__.__name__} "{self._shortname}" [{self._configuration.datatype.name}] at {addr}>'
def _configure(self, config: BaseDetailedWatchableConfiguration) -> None:
"""Store the server-provided configuration and reset the value state.
Called by the client after the server confirms a subscription.
:param config: The :class:`BaseDetailedWatchableConfiguration` returned by the server.
"""
with self._lock:
self._configuration = config
self._status = ValueStatus.NeverSet
self._value = None
self._last_value_dt = None
self._update_counter = 0
def _set_last_write_datetime(self, dt: Optional[datetime] = None) -> None:
"""Record the datetime of the most recent completed write operation.
:param dt: The completion datetime. Uses the current wall-clock time when ``None``.
"""
if dt is None:
dt = datetime.now()
with self._lock:
self._last_write_dt = dt
def _update_value(self, val: Optional[ValType], timestamp: Optional[datetime] = None) -> None:
"""Update the cached value and mark the status as ``ValueStatus.Valid``.
No-op if the status is ``ValueStatus.ServerGone``.
Called by the client worker thread when a watchable-update message is received.
:param val: The new value.
:param timestamp: Server-side timestamp of the update. Uses the local wall clock when ``None``.
"""
with self._lock:
if self._status != ValueStatus.ServerGone:
self._status = ValueStatus.Valid
self._value = val
self._last_value_dt = timestamp if timestamp is not None else datetime.now()
self._update_counter += 1 # unbound in size in python 3
else:
self._value = None
def _set_dead(self, status: ValueStatus) -> None:
"""Set the handle as "dead". Meaning it cannot be used anymore. the value is also marked invalid
:param status: The new :class:`ValueStatus` to assign. Must not be ``ValueStatus.Valid``.
"""
self._dead = True
self._set_invalid(status)
def _set_invalid(self, status: ValueStatus, timestamp: Optional[datetime] = None) -> None:
"""Clear the cached value and set a non-``Valid`` status.
:param status: The new :class:`ValueStatus` to assign. Must not be ``ValueStatus.Valid``.
:param timestamp: Time at which the value as been set invalid.
"""
assert status != ValueStatus.Valid
with self._lock:
self._value = None
self._status = status
self._last_value_dt = timestamp if timestamp is not None else datetime.now()
def _read(self) -> ValType:
"""Return the current cached value.
:raises InvalidValueError: If the value is ``None`` or the status is not ``ValueStatus.Valid``.
"""
val, val_status = self.get_value_and_status() # Thread safe
if val is None or val_status != ValueStatus.Valid:
raise sdk_exceptions.InvalidValueError(f"Value of {self._shortname} is unusable. {val_status._get_error()}")
return val
def _write(self, val: Union[ValType, str], parse_enum: bool) -> WriteRequest:
"""Submit a value write to the server and wait for it to complete (unless a batch write is active).
:param val: The value to write. A ``str`` is required when ``parse_enum`` is ``True``.
:param parse_enum: When ``True``, ``val`` is interpreted as an enum name and converted to its integer value.
:raises ValueError: If ``parse_enum`` is ``True`` but ``val`` is not a ``str``.
:raises BadEnumError: If ``parse_enum`` is ``True`` and ``val`` is not a valid enumerator name.
:returns: The :class:`WriteRequest<scrutiny.sdk.write_request.WriteRequest>` that was submitted.
"""
if parse_enum:
if not isinstance(val, str):
raise ValueError(f"Value is not an enum string")
val = self.parse_enum_val(val) # check for enum is done inside this
write_request = WriteRequest(self, val)
self._client._process_write_request(write_request)
if not self._client._is_batch_write_in_progress():
write_request.wait_for_completion()
return write_request
def _assert_has_enum(self) -> None:
"""Assert that the watchable has an enum associated with it.
:raises BadEnumError: If no enum is defined for this watchable.
"""
if not self.has_enum():
raise sdk_exceptions.BadEnumError(f"Watchable {self._shortname} has no enum defined")
def _assert_configured(self) -> None:
"""Assert that the handle has been configured by the server after a successful watch subscription.
:raises InvalidValueError: If the handle has not yet been configured.
"""
if self._configuration is None:
raise sdk_exceptions.InvalidValueError("This watchable handle is not ready to be used")
def unwatch(self) -> None:
"""Stop watching this item by unsubscribing to the server. Marks the handle as "dead".
See :attr:`is_dead<scrutiny.sdk.watchable_handle.WatchableHandle.is_dead>`
:raises NameNotFoundError: If the required path is not presently being watched
:raises OperationFailure: If the subscription cancellation failed in any way
"""
self._client.unwatch(self._server_path)
[docs]
def wait_update(self, timeout: float, previous_counter: Optional[int] = None, sleep_interval: float = 0.02) -> None:
"""Wait for the value to be updated by the server
:param timeout: Amount of time to wait for a value update
:param previous_counter: Optional update counter to use for change detection. Can be set to ``update_counter+N`` to wait for N updates
:param sleep_interval: Value passed to ``time.sleep`` while waiting
:raises TypeError: Given parameter not of the expected type
:raises ValueError: Given parameter has an invalid value
:raises InvalidValueError: If the watchable becomes invalid while waiting
:raises TimeoutException: If no value update happens within the given timeout
"""
timeout = validation.assert_float_range(timeout, 'timeout', minval=0)
validation.assert_int_range_if_not_none(previous_counter, 'previous_counter', minval=0)
t1 = time.monotonic()
entry_counter = self._update_counter if previous_counter is None else previous_counter
while True:
if time.monotonic() - t1 > timeout:
raise sdk_exceptions.TimeoutException(f'Value of {self._shortname} did not update in {timeout}s')
# No lock on purpose. Status can only go once to NeverSet or Valid
if self._status != ValueStatus.NeverSet and self._status != ValueStatus.Valid:
raise sdk_exceptions.InvalidValueError(self._status._get_error())
if entry_counter != self._update_counter:
break
time.sleep(sleep_interval)
[docs]
def wait_value(self, value: Union[ValType, str], timeout: float, sleep_interval: float = 0.02) -> None:
"""
Wait for the watchable to reach a given value. Raises an exception if it does not happen within a timeout value
:param value: The value that this watchable must have to exit the wait state
:param timeout: Maximum amount of time to wait for the given value
:param sleep_interval: Value passed to ``time.sleep`` while waiting
:raises TypeError: Given parameter not of the expected type
:raises ValueError: Given parameter has an invalid value
:raises BadEnumError: If ``value`` is a string and no enumerator value matches it
:raises InvalidValueError: If the watchable becomes invalid while waiting
:raises TimeoutException: If the watchable value never changes for the given value within the given timeout
"""
timeout = validation.assert_float_range(timeout, 'timeout', minval=0)
sleep_interval = validation.assert_float_range(sleep_interval, 'sleep_interval', minval=0)
if isinstance(value, str):
value = self.parse_enum_val(value)
if value < 0 and not self.datatype.is_signed():
raise ValueError(f"{self._shortname} is unsigned and will never have a negative value as requested")
t1 = time.monotonic()
while True:
if time.monotonic() - t1 > timeout:
raise sdk_exceptions.TimeoutException(f'Value of {self._shortname} did not set to {value} in {timeout}s')
if self._status != ValueStatus.NeverSet:
if self.datatype.is_float():
if float(value) == self.value_float:
break
elif self.datatype.is_integer():
if int(value) == self.value_int:
break
elif self.datatype == EmbeddedDataType.boolean:
if bool(value) == self.value_bool:
break
time.sleep(sleep_interval)
[docs]
def has_enum(self) -> bool:
"""Tells if the watchable has an enum associated with it"""
self._assert_configured()
assert self._configuration is not None
return self._configuration.has_enum()
[docs]
def get_enum(self) -> EmbeddedEnum:
""" Returns the enum associated with this watchable
:raises BadEnumError: If the watchable has no enum assigned
"""
self._assert_configured()
assert self._configuration is not None
return self._configuration.get_enum()
[docs]
def parse_enum_val(self, val: str) -> int:
"""Converts an enum value name (string) to the underlying integer value
:param val: The enumerator name to convert
:raises BadEnumError: If the watchable has no enum assigned or the given value is not a valid enumerator
:raises TypeError: Given parameter not of the expected type
"""
self._assert_configured()
assert self._configuration is not None
return self._configuration.parse_enum_val(val)
def get_value_and_status(self) -> Tuple[Optional[ValType], ValueStatus]:
"""Returns a tuple with the value and the value status.
If the status is :attr:`Valid<scrutiny.sdk.ValueStatus.Valid>`, then the value is guaranteed to contain a value.
If status != :attr:`Valid<scrutiny.sdk.ValueStatus.Valid>`, the value will be ``None``. This method does not raise an exception on invalid values.
"""
with self._lock:
val = self._value
val_status = self._status
return (val, val_status)
[docs]
def change_update_rate(self, update_rate: Optional[float]) -> Optional[float]:
"""Request the server to change the target update rate for this watchable (optionally set when
calling :meth:`watch()<scrutiny.sdk.client.ScrutinyClient.watch>`).
When there are multiple clients watching the same watchable, the server applies the fastest required rate.
:param update_rate: The new polling rate. A value of ``None`` indicates that updates should happen as fast as possible.
Must be ``None`` or greater or equal to 1
:return: The effective update rate at the moment of change. May be higher or change later if another client requires it.
:raises TypeError: Given parameter not of the expected type
:raises ValueError: Given parameter has an invalid value
:raises OperationFailure: If the request fails to complete
"""
return self._client._change_update_rate(self, update_rate)
@property
@deprecated("Replaced by server_path")
def display_path(self) -> str:
"""[DEPRECATED] Replaced by :attr:`server_path`"""
return self._server_path
@property
def server_path(self) -> str:
"""Returns the watchable full tree path given by the server"""
return self._server_path
@property
def name(self) -> str:
"""Returns the watchable name, e.g. the basename in the server_path"""
return self._shortname
@property
def type(self) -> WatchableType:
"""The watchable type. Variable, Alias or RuntimePublishedValue"""
self._assert_configured()
assert self._configuration is not None
return self._configuration.watchable_type
@property
def datatype(self) -> EmbeddedDataType:
"""The data type of the device element pointed by this watchable. (sint16, float32, etc.)"""
self._assert_configured()
assert self._configuration is not None
return self._configuration.datatype
@property
def server_id(self) -> str:
"""The unique ID assigned by the server for this watchable"""
self._assert_configured()
assert self._configuration is not None
return self._configuration.server_id
@property
def value(self) -> ValType:
"""The value without cast.
- When reading, returns a ``int``, ``float`` or ``bool``.
- When writing, accepts ``int``, ``float``, ``bool`` or a ``str``.
If a string is assigned, the value is sent "as is" to the server which will then try to parse it.
The server will accepts "true", "false" or a mathematical expression supporting arithmetic operators (``+``, ``-``, ``*``, ``/``, ``^``),
base prefix (``0x``, ``0b``), scientific notation (1.5e-2), constants (such as pi) and common math functions. including: ``abs``, ``exp``, ``pow``, ``sqrt``, ``mod``,
``ceil``, ``floor``, ``log``, ``ln``, ``log10``,
``hypot``, ``degrees``, ``radians``,
``cos``, ``cosh``, ``acos``, ``sin``, ``sinh``, ``asin``, ``tan``, ``tanh``, ``atan``, ``atan2``
:raises InvalidValueError: When reading, if the value has never been set or the handle is no longer valid.
:raises OperationFailure: When writing, if the server fails to complete the write.
"""
return self._read()
@value.setter
def value(self, val: Union[ValType, str]) -> None:
self._write(val, parse_enum=False)
@property
def value_bool(self) -> bool:
"""The value cast as ``bool``"""
return bool(self.value)
@property
def value_int(self) -> int:
"""The value cast as ``int``"""
return int(self.value)
@property
def value_float(self) -> float:
"""The value cast as ``float``"""
return float(self.value)
@property
def value_enum(self) -> str:
"""The value converted to its first enum name (alphabetical order). Returns a string. Can be written with a string.
:raises BadEnumError: When reading, if the watchable has no enum defined. When writing, if ``val`` is not a valid enumerator name.
:raises ValueError: When writing, if the assigned value is not a ``str``.
:raises OperationFailure: When writing, if the server fails to complete the write.
"""
val_int = self.value_int
self._assert_configured()
assert self._configuration is not None
self._assert_has_enum()
assert self._configuration.enum is not None
for k in sorted(self._configuration.enum.vals.keys()):
if self._configuration.enum.vals[k] == val_int:
return k
raise sdk_exceptions.InvalidValueError(
f"Watchable {self._shortname} has value {val_int} which is not a valid enum value for enum {self._configuration.enum.name}")
@value_enum.setter
def value_enum(self, val: str) -> None:
self._write(val, parse_enum=True)
@property
def last_update_timestamp(self) -> Optional[datetime]:
"""Time of the last value update. ``None`` if not updated at least once. Not reliable for change detection"""
return self._last_value_dt
@property
def last_write_timestamp(self) -> Optional[datetime]:
"""Time of the last successful write operation. ``None`` if never written"""
return self._last_write_dt
@property
def update_counter(self) -> int:
"""Number of value update gotten since the creation of the handle. Can be safely used for change detection"""
return self._update_counter
@property
def is_dead(self) -> bool:
"""Flag indicating if this handle is dead, meaning it will never be updated in the future. Once a handle is dead, it can be disposed of.
Unwatching a handle will mark it "dead". """
return self._dead
@property
def status(self) -> ValueStatus:
"""Return the value status. Refer the :meth:`get_value_and_status()<scrutiny.sdk.watchable_handle.WatchableHandler.get_value_and_status>` to
read the value and the status together atomicly."""
return ValueStatus(self._status)
@property
def var_details(self) -> DetailedVarWatchableConfiguration:
"""Returns the variable-specific metadata.
:raises BadTypeError: If the watchable :attr:`type` is not :attr:`Variable<scrutiny.sdk.WatchableType.Variable>`.
"""
self._assert_configured()
if not isinstance(self._configuration, DetailedVarWatchableConfiguration):
raise sdk_exceptions.BadTypeError(f"Watchable {self._shortname} is not a variable. Type={self.type.name}")
return self._configuration
@property
def alias_details(self) -> DetailedAliasWatchableConfiguration:
"""Returns the alias-specific metadata.
:raises BadTypeError: If the watchable :attr:`type` is not :attr:`Alias<scrutiny.sdk.WatchableType.Alias>`.
"""
self._assert_configured()
if not isinstance(self._configuration, DetailedAliasWatchableConfiguration):
raise sdk_exceptions.BadTypeError(f"Watchable {self._shortname} is not an alias. Type={self.type.name}")
return self._configuration
@property
def rpv_details(self) -> DetailedRPVWatchableConfiguration:
"""Returns the RPV-specific metadata.
:raises BadTypeError: If the watchable :attr:`type` is not :attr:`RuntimePublishedValue<scrutiny.sdk.WatchableType.RuntimePublishedValue>`.
"""
self._assert_configured()
if not isinstance(self._configuration, DetailedRPVWatchableConfiguration):
raise sdk_exceptions.BadTypeError(f"Watchable {self._shortname} is not a Runtime Published Value. Type={self.type.name}")
return self._configuration
@property
def requested_update_rate(self) -> Optional[float]:
return self._requested_update_rate