Source code for scrutiny.sdk.pending_request
# pending_request.py
# A base class for Future objects given to the suer
#
# - License : MIT - See LICENSE file
# - Project : Scrutiny Debugger (github.com/scrutinydebugger/scrutiny-main)
#
# Copyright (c) 2024 Scrutiny Debugger
__all__ = ['PendingRequest']
from datetime import datetime
import threading
import time
from scrutiny.tools import validation
from scrutiny import sdk
from scrutiny.tools.typing import *
if TYPE_CHECKING:
from scrutiny.sdk.client import ScrutinyClient
[docs]
class PendingRequest:
"""Base class for future-like request handles returned to the user.
Tracks the lifecycle of an asynchronous server request: whether it is still in flight,
whether it succeeded or failed, and the reason for any failure.
Subclasses add domain-specific result data on top of this common infrastructure.
"""
_client: "ScrutinyClient"
"""The :class:`ScrutinyClient<scrutiny.sdk.client.ScrutinyClient>` that owns this request"""
_completed: bool
"""``True`` once the request has reached a terminal state (success or failure)"""
_success: bool
"""``True`` if the request completed successfully"""
_completion_datetime: Optional[datetime]
"""Wall-clock time at which the request transitioned to its terminal state, or ``None`` if still in flight"""
_completed_event: threading.Event
"""Threading event that is set when the request reaches its terminal state"""
_failure_reason: str
"""Human-readable description of why the request failed. Empty string when incomplete or succeeded"""
_monotonic_creation_timestamp: float
"""Monotonic timestamp (seconds) recorded when the request object was created"""
_monotonic_expiration_timestamp: float
"""Monotonic timestamp (seconds) reset each time new data arrives, used to detect stale requests"""
_completion_lock: threading.Lock
"""Lock that ensures only the first completion call wins when concurrent completions race"""
def __init__(self, client: "ScrutinyClient") -> None:
self._client = client
self._completed = False
self._success = False
self._completion_datetime = None
self._completed_event = threading.Event()
self._failure_reason = ""
self._monotonic_creation_timestamp = time.monotonic()
self._monotonic_expiration_timestamp = self._monotonic_creation_timestamp
self._completion_lock = threading.Lock()
def _is_expired(self, timeout: float) -> bool:
"""Return ``True`` if no new data has arrived for longer than ``timeout`` seconds.
:param timeout: Inactivity threshold in seconds.
"""
return time.monotonic() - self._monotonic_expiration_timestamp > timeout
def _update_expiration_timer(self) -> None:
"""Reset the expiration timestamp to the current monotonic time, signalling that fresh data has arrived"""
self._monotonic_expiration_timestamp = time.monotonic()
def _mark_complete(self, success: bool, failure_reason: str = "", server_time_us: Optional[float] = None) -> None:
"""Transition the request to its terminal state.
Thread-safe: only the first call takes effect; subsequent calls are ignored.
Expected to be called by the worker thread, but any thread may call it.
:param success: ``True`` if the request succeeded, ``False`` otherwise.
:param failure_reason: Human-readable description of the failure. Ignored when ``success`` is ``True``.
:param server_time_us: Server-side completion time in microseconds. When ``None``, the local wall clock is used.
"""
# We use a lock in case there is 2 simultaneous failures, we keep the first one.
# Some client method can spawn an ephemerous thread to do client request, like upload_sfd
with self._completion_lock:
if not self._completed:
self._success = success
self._failure_reason = failure_reason
if server_time_us is None:
self._completion_datetime = datetime.now()
else:
self._completion_datetime = self._client._server_timebase.micro_to_dt(server_time_us)
self._completed = True
self._completed_event.set()
def _timeout_exception_msg(self, timeout: float) -> str:
"""Build the message for a :class:`TimeoutException<scrutiny.sdk.exceptions.TimeoutException>` when the request exceeds its deadline.
:param timeout: The timeout value in seconds that was exceeded.
"""
return f"Request did not complete in {timeout} seconds"
def _failure_exception_msg(self) -> str:
"""Build the message for an :class:`OperationFailure<scrutiny.sdk.exceptions.OperationFailure>` exception when the request has failed"""
return f"Request failed to complete. {self._failure_reason}"
[docs]
def wait_for_completion(self, timeout: Optional[float] = None) -> None:
"""Wait for the request to complete
:params timeout: Maximum wait time in seconds. Waits forever if ``None``
:raises TimeoutException: If the request does not complete in less than the specified timeout value
:raises OperationFailure: If an error happened that prevented the request to successfully complete
"""
timeout = validation.assert_float_range_if_not_none(timeout, 'timeout', minval=0)
self._completed_event.wait(timeout=timeout)
if not self._completed:
assert timeout is not None
raise sdk.exceptions.TimeoutException(self._timeout_exception_msg(timeout))
assert self._completed_event.is_set()
if not self._success:
raise sdk.exceptions.OperationFailure(self._failure_exception_msg())
@property
def completed(self) -> bool:
"""Indicates whether the request has completed or not"""
return self._completed_event.is_set()
@property
def is_success(self) -> bool:
"""Indicates whether the request has successfully completed or not"""
return self._success
@property
def completion_datetime(self) -> Optional[datetime]:
"""The time at which the request has been completed. ``None`` if not completed yet"""
return self._completion_datetime
@property
def failure_reason(self) -> str:
"""When the request failed, this property contains the reason for the failure. Empty string if not completed or succeeded"""
return self._failure_reason