Source code for scrutiny.sdk.listeners.buffered_reader_listener

#    buffered_reader_listener.py
#        Create a listener that simply enqueue the updates in a queue for the user to read
#        them
#
#   - License : MIT - See LICENSE file.
#   - Project :  Scrutiny Debugger (github.com/scrutinydebugger/scrutiny-main)
#
#   Copyright (c) 2024 Scrutiny Debugger

__all__ = ['BufferedReaderListener']

from scrutiny.sdk.listeners import ValueUpdate
from . import BaseListener

import queue
from scrutiny.tools.typing import *


[docs] class BufferedReaderListener(BaseListener): _queue: "queue.Queue[ValueUpdate]"
[docs] def __init__(self, queue_max_size: int, *args: Any, **kwargs: Any): """Creates a listener that makes a copy of every received :class:`ValueUpdate<scrutiny.sdk.listeners.ValueUpdate>` object and push them into a queue, waiting for the user to read them. Adding/removing subscriptions while running is allowed :param queue_max_size: Queue max size. Updates will be dropped if the queue is not read fast enough and this size is exceeded :param args: Passed to :class:`BaseListener<scrutiny.sdk.listeners.BaseListener>` :param kwargs: Passed to :class:`BaseListener<scrutiny.sdk.listeners.BaseListener>` """ BaseListener.__init__(self, *args, **kwargs) self._queue = queue.Queue(maxsize=queue_max_size)
def receive(self, updates: List[ValueUpdate]) -> None: for update in updates: try: self._queue.put_nowait(update) except queue.Full: self._logger.warning("Queue is full. Dropping updates")
[docs] def get_queue(self) -> "queue.Queue[ValueUpdate]": """Returns the queue used for storage""" return self._queue
def allow_subscription_changes_while_running(self) -> bool: return True