Source code for scrutiny.sdk.watchable_handle

#    watchable_handle.py
#        A handle on a watchable element (Variable, Alias, RPV). This handle is created by
#        the client when watching
#
#   - License : MIT - See LICENSE file.
#   - Project :  Scrutiny Debugger (github.com/scrutinydebugger/scrutiny-python)
#
#   Copyright (c) 2021 Scrutiny Debugger

import threading
from datetime import datetime
import time

from scrutiny.sdk.definitions import *
from scrutiny.core.basic_types import *
from scrutiny.core.embedded_enum import EmbeddedEnum
import scrutiny.sdk.exceptions as sdk_exceptions
from scrutiny.sdk.write_request import WriteRequest
from scrutiny.tools import validation
from typing import Optional, Union, TYPE_CHECKING

if TYPE_CHECKING:
    from scrutiny.sdk.client import ScrutinyClient


ValType = Union[int, float, bool]

__all__ = ['WatchableHandle']

[docs] class WatchableHandle: """A handle to a server watchable element (Variable / Alias / RuntimePublishedValue) that gets updated by the client thread.""" _client: "ScrutinyClient" # The client that created this handle _display_path: str # The display path _shortname: str # Name of the last element in the display path _configuration:Optional[WatchableConfiguration] _lock: threading.Lock # A lock to access the value _status: ValueStatus # Status of the value. Tells if the value is valid or not and why it is invalid if not _value: Optional[ValType] # Contain the latest value gotten by the client _last_value_dt: Optional[datetime] # Datetime of the last value update by the client _last_write_dt: Optional[datetime] # Datetime of the last completed write on this element _update_counter: int # A counter that gets incremented each time the value is updated
[docs] def __init__(self, client: "ScrutinyClient", display_path: str) -> None: self._client = client self._display_path = display_path self._shortname = display_path.split('/')[-1] self._configuration = None self._lock = threading.Lock() self._update_counter = 0 self._set_invalid(ValueStatus.NeverSet)
def __repr__(self) -> str: addr = "0x%0.8x" % id(self) if self._configuration is None: return f'<{self.__class__.__name__} "{self._shortname}" [Unconfigured] at {addr}>' return f'<{self.__class__.__name__} "{self._shortname}" [{self._configuration.datatype.name}] at {addr}>' def _configure(self, config: WatchableConfiguration) -> None: with self._lock: self._configuration = config self._status = ValueStatus.NeverSet self._value = None self._last_value_dt = None self._update_counter = 0 def _set_last_write_datetime(self, dt: Optional[datetime] = None) -> None: if dt is None: dt = datetime.now() with self._lock: self._last_write_dt = dt def _update_value(self, val: ValType, timestamp:Optional[datetime]=None) -> None: with self._lock: if self._status != ValueStatus.ServerGone: self._status = ValueStatus.Valid self._value = val self._last_value_dt = timestamp if timestamp is not None else datetime.now() self._update_counter += 1 # unbound in size in python 3 else: self._value = None def _set_invalid(self, status: ValueStatus) -> None: assert status != ValueStatus.Valid with self._lock: self._value = None self._status = status def _read(self) -> ValType: with self._lock: val = self._value val_status = self._status if val is None or val_status != ValueStatus.Valid: raise sdk_exceptions.InvalidValueError(f"Value of {self._shortname} is unusable. {val_status._get_error()}") return val def _write(self, val: Union[ValType, str], parse_enum:bool) -> WriteRequest: if parse_enum: if not isinstance(val, str): raise ValueError(f"Value is not an enum string") val = self.parse_enum_val(val) # check for enum is done inside this write_request = WriteRequest(self, val) self._client._process_write_request(write_request) if not self._client._is_batch_write_in_progress(): write_request.wait_for_completion() return write_request def _assert_has_enum(self) -> None: if not self.has_enum(): raise sdk_exceptions.BadEnumError(f"Watchable {self._shortname} has no enum defined") def _assert_configured(self)-> None: if self._configuration is None: raise sdk_exceptions.InvalidValueError("This watchable handle is not ready to be used") def unwatch(self) -> None: """Stop watching this item by unsubscribing to the server :raise NameNotFoundError: If the required path is not presently being watched :raise OperationFailure: If the subscription cancellation failed in any way """ self._client.unwatch(self._display_path)
[docs] def wait_update(self, timeout: float, previous_counter: Optional[int] = None, sleep_interval: float = 0.02) -> None: """Wait for the value to be updated by the server :param timeout: Amount of time to wait for a value update :param previous_counter: Optional update counter to use for change detection. Can be set to ``update_counter+N`` to wait for N updates :param sleep_interval: Value passed to ``time.sleep`` while waiting :raise TypeError: Given parameter not of the expected type :raise ValueError: Given parameter has an invalid value :raise InvalidValueError: If the watchable becomes invalid while waiting :raise TimeoutException: If no value update happens within the given timeout """ timeout = validation.assert_float_range(timeout, 'timeout', minval=0) validation.assert_int_range_if_not_none(previous_counter, 'previous_counter', minval=0) t1 = time.monotonic() entry_counter = self._update_counter if previous_counter is None else previous_counter while True: if time.monotonic() - t1 > timeout: raise sdk_exceptions.TimeoutException(f'Value of {self._shortname} did not update in {timeout}s') if self._status != ValueStatus.NeverSet and self._status != ValueStatus.Valid: raise sdk_exceptions.InvalidValueError(self._status._get_error()) if entry_counter != self._update_counter: break time.sleep(sleep_interval)
[docs] def wait_value(self, value: Union[ValType, str], timeout: float, sleep_interval: float = 0.02) -> None: """ Wait for the watchable to reach a given value. Raises an exception if it does not happen within a timeout value :param value: The value that this watchable must have to exit the wait state :param timeout: Maximum amount of time to wait for the given value :param sleep_interval: Value passed to ``time.sleep`` while waiting :raise TypeError: Given parameter not of the expected type :raise ValueError: Given parameter has an invalid value :raise InvalidValueError: If the watchable becomes invalid while waiting :raise TimeoutException: If the watchable value never changes for the given value within the given timeout """ timeout = validation.assert_float_range(timeout, 'timeout', minval=0) sleep_interval = validation.assert_float_range(sleep_interval, 'timeout', minval=0) if isinstance(value, str): value = self.parse_enum_val(value) if value < 0 and not self.datatype.is_signed(): raise ValueError(f"{self._shortname} is unsigned and will never have a negative value as requested") t1 = time.monotonic() while True: if time.monotonic() - t1 > timeout: raise sdk_exceptions.TimeoutException(f'Value of {self._shortname} did not set to {value} in {timeout}s') if self.datatype.is_float(): if float(value) == self.value_float: break elif self.datatype.is_integer(): if int(value) == self.value_int: break elif self.datatype == EmbeddedDataType.boolean: if bool(value) == self.value_bool: break time.sleep(sleep_interval)
[docs] def has_enum(self) -> bool: """Tells if the watchable has an enum associated with it""" self._assert_configured() assert self._configuration is not None return self._configuration.enum is not None
[docs] def get_enum(self) -> EmbeddedEnum: """ Returns the enum associated with this watchable :raises BadEnumError: If the watchable has no enum assigned """ self._assert_configured() assert self._configuration is not None self._assert_has_enum() assert self._configuration.enum is not None return self._configuration.enum.copy()
[docs] def parse_enum_val(self, val:str) -> int: """Converts an enum value name (string) to the underlying integer value :param val: The enumerator name to convert :raises BadEnumError: If the watchable has no enum assigned or the given value is not a valid enumerator :raise TypeError: Given parameter not of the expected type """ validation.assert_type(val, 'val', str) self._assert_configured() assert self._configuration is not None self._assert_has_enum() assert self._configuration.enum is not None if not self._configuration.enum.has_value(val): raise sdk_exceptions.BadEnumError(f"Value {val} is not a valid value for enum {self._configuration.enum.name}") return self._configuration.enum.get_value(val)
[docs] def write_value_str(self, val:str) -> None: """Write a value as a string and let the server parse it to a numerical value. Supports true/false, hexadecimal (with 0x prefix), float, int and possibly more. :param val: The string value :raise TypeError: Given parameter not of the expected type :raise TimeoutException: If the request times out :raise OperationFailure: If the request fails for any reason, including an unparsable value. """ validation.assert_type(val, 'val', str) self._write(val, parse_enum=False)
@property def display_path(self) -> str: """Contains the watchable full tree path""" return self._display_path @property def name(self) -> str: """Contains the watchable name, e.g. the basename in the display_path""" return self._shortname @property def type(self) -> WatchableType: """The watchable type. Variable, Alias or RuntimePublishedValue""" self._assert_configured() assert self._configuration is not None return self._configuration.watchable_type @property def datatype(self) -> EmbeddedDataType: """The data type of the device element pointed by this watchable. (sint16, float32, etc.)""" self._assert_configured() assert self._configuration is not None return self._configuration.datatype @property def server_id(self) -> str: """The unique ID assigned by the server for this watchable""" self._assert_configured() assert self._configuration is not None return self._configuration.server_id @property def value(self) -> ValType: return self._read() @value.setter def value(self, val: ValType) -> None: self._write(val, parse_enum=False) @property def value_bool(self) -> bool: """The value casted as bool""" return bool(self.value) @property def value_int(self) -> int: """The value casted as int""" return int(self.value) @property def value_float(self) -> float: """The value casted as float""" return float(self.value) @property def value_enum(self) -> str: """The value converted to its first enum name (alphabetical order). Returns a string. Can be written with a string""" val_int = self.value_int self._assert_configured() assert self._configuration is not None self._assert_has_enum() assert self._configuration.enum is not None for k in sorted(self._configuration.enum.vals.keys()): if self._configuration.enum.vals[k] == val_int: return k raise sdk_exceptions.InvalidValueError(f"Watchable {self._shortname} has value {val_int} which is not a valid enum value for enum {self._configuration.enum.name}") @value_enum.setter def value_enum(self, val: str) -> None: self._write(val, parse_enum=True) @property def last_update_timestamp(self) -> Optional[datetime]: """Time of the last value update. ``None`` if not updated at least once. Not reliable for change detection""" return self._last_value_dt @property def last_write_timestamp(self) -> Optional[datetime]: """Time of the last successful write operation. ``None`` if never written""" return self._last_write_dt @property def update_counter(self) -> int: """Number of value update gotten since the creation of the handle. Can be safely used for change detection""" return self._update_counter @property def is_dead(self) -> bool: status = ValueStatus(self._status) # copy for atomicity return status not in (ValueStatus.Valid, ValueStatus.NeverSet)