Listeners#

Introduction#

Synchronous access to watchables (variables, aliases, and RPVs), as outlined in the Accessing Variables section, can be useful. However, it has certain limitations, especially when monitoring multiple values simultaneously.

For example, if one wants to log a list of watchables, it would required to continuously loop and monitor the udpate_counter property to detect changes. However, this approach does not guarantee that all changes will be noticed by the user thread. In addition to being unreliable, this technique will cause unnecessary CPU usage.

To address this issue, the Client object can function as a Notifier. It informs a list of listeners when it receives a value update broadcast from the server. Registering a listener is done through register_listener

ScrutinyClient.register_listener(listener)[source]#

Register a new listener. The client will notify it each time a new value update is received from the server

Parameters:

listener (BaseListener) – The listener to register

Return type:

None


Using a listener#

The first step in using a listener is to subscribe watchables to it by calling subscribe().

BaseListener.subscribe(watchables)[source]#

Add one or many new watchables to the list of monitored watchables. Can only be called before the listener is started.

Parameters:

watchables (WatchableHandle | Iterable[WatchableHandle]) – The list of watchables to add to the monitor list

Raises:
  • TypeError – Given parameter not of the expected type

  • ValueError – Given parameter has an invalid value

  • OperationFailure – Failed to complete the batch write

Return type:

None

Once this is done, the listener can be started, after which, subscribing to new watchables is not allowed. Each listener has a start() and a stop() method.

BaseListener.start()[source]#

Starts the listener thread. Once started, no more subscription can be added.

Raises:

OperationFailure – If an error occur while starting the listener

Return type:

BaseListener


BaseListener.stop()[source]#

Stops the listener thread

Return type:

None

start() can be used is a with statement like so

from scrutiny.sdk.client import ScrutinyClient
from scrutiny.sdk.listeners.text_stream_listener import TextStreamListener
import time

client = ScrutinyClient()
with client.connect('localhost', 1234):
    listener = TextStreamListener()     # TextStreamListener prints all updates to stdout by default
    client.register_listener(listener)  # Attach to the client

    some_var_1 = client.watch('/var/global/some_var')
    the_other_var = client.watch('/var/static/main.cpp/the_other_var')

    listener.subscribe([some_var_1, the_other_var]) # Tells the listener which watchable to listen for
    with listener.start():  # Start the listener
        # setup() has been called from the listener thread
        time.sleep(5)
    # teardown() has been called from the listener thread

    print("We are done")
# Client is automatically disconnected

Internal behavior#

A listener runs in a separate thread and awaits value updates by monitoring a python queue that is fed by the client object. The Python queue object internally utilizes condition variables, which results in a scheduler switch between the notifier thread and the listner thread occurring in just microseconds.

When the update notification reaches the listener, they are forwarded to the listener-specific receive() method.

_images/listener_threads.png

Once the user thread invokes the start() method, the listener thread is launched and the setup() method is called from within this new thread.

If start() succeeds and setup() is correctly invoked, the teardown() method is guaranteed to be invoked too, irrespective of whether an exception has been raised within the setup() or receive().

The teardown() is called from the listener thread if the user calls stop() or if an exception occur during setup or while listening.


Writing a Listener#

To write a listener, one must create a class that inherits the BaseListener class and implements the receive() method.

class scrutiny.sdk.listeners.BaseListener[source]#
__init__(name=None, queue_max_size=1000)[source]#

Base abstract class for all listeners. receive must be overriden. setup and teardown can optionally be overriden.

Parameters:
  • name (str | None) – Name of the listener used for logging purpose

  • queue_max_size (int) – Internal queue maximum size. If the queue is ever full, the update notification will be dropped

Return type:

None

get_subscriptions()[source]#

Returns a set with all the watchables that this listener is subscribed to

Return type:

Set[WatchableHandle]

property is_started: bool#

Tells if the listener thread is running

property name: str#

The name of the listener

property drop_count: int#

The number of update dropped due to a full internal queue

property update_count: int#

The number of update received (not dropped)

property error_occured: int#

Tells if an error occured while running the listener


abstract BaseListener.receive(updates)[source]#

Method called by the listener thread each time the client notifies the listeners for one or many updates

Parameters:

updates (List[ValueUpdate]) – List of updates being broadcast

Return type:

None


The element passed to receive() are immutable ValueUpdate objects that represents the update content.

class scrutiny.sdk.listeners.ValueUpdate[source]#

(Immutable struct) Contains the relevant information about a watchable update broadcast by the server

display_path: str#

The textual tree-path used to identify watchables on the server

datatype: EmbeddedDataType#

The datatype of the watchable in the device

value: int | float | bool#

Value received in the update

update_timestamp: datetime#

Timestamp of the update

watchable_type: WatchableType#

The type of watchable (var, rpv, alias)


Two optional methods can be overriden to perform a setup and/or a teardown. If not overriden, these 2 methods will do nothing by default.

BaseListener.setup()[source]#

Overridable function called by the listener from its thread when starting, before monitoring

Return type:

None


BaseListener.teardown()[source]#

Overridable function called by the listener from its thread when stopping, right after being done monitoring

Return type:

None


Available listeners#

There is a few listeners already available in the Scrutiny SDK.

TextStreamListener#

class scrutiny.sdk.listeners.text_stream_listener.TextStreamListener[source]#
__init__(stream=<_io.TextIOWrapper name='<stdout>' mode='w' encoding='utf-8'>, *args, **kwargs)[source]#

Create a listener that writes every value update it receive into a text stream, by formatting the update into a single-line string representation of the form: <time>ms\t(<type>/<datatype>) <path>: <value>.

Where

  • <time> is the relative time in millisecond since the listener has been started

  • <type> is the watchable type : variable, alias or rpv

  • <datatype> is the value datatype, such as : sint8, float32, uint16, etc.

  • <path> is the tree path used to identify the watchable at the server level

  • <value> Value converted to text

Parameters:
  • stream (TextIO) – The text stream to write to. Defaults to stdout

  • args (Any) – Passed to BaseListener

  • kwargs (Any) – Passed to BaseListener

BufferedReaderListener#

class scrutiny.sdk.listeners.buffered_reader_listener.BufferedReaderListener[source]#
__init__(*args, **kwargs)[source]#

Creates a listener that makes a copy of every received ValueUpdate object and push them into a queue, waiting for the user to read them.

Parameters:
get_queue()[source]#

Returns the queue used for storage

Return type:

SimpleQueue[ValueUpdate]

CSVFileListener#

class scrutiny.sdk.listeners.csv_file_listener.CSVFileListener[source]#
__init__(folder, filename, lines_per_file=None, datetime_format='%Y-%m-%d %H:%M:%S.%f', convert_bool_to_int=True, file_part_0pad=4, csv_config=None, *args, **kwargs)[source]#

Listener that writes the watchable values into a CSV file as they are received

Parameters:
  • folder (str) – Folder in which to save the CSV file

  • filename (str) – Name of the file to create

  • lines_per_file (int | None) – Maximum number of lines per file, no limits if None. When this value is set to a valid integer, the file naming pattern will be <filename>_XXXX.csv where XXXX is the the part number starting from 0. When no limit is specified, a single CSV file will be created following with name <filename>.csv

  • datetime_format (str) – Format string for the datetime printed in the CSV file

  • convert_bool_to_int (bool) – When True, boolean values will be printed as 0 and 1 instead of False and True. Convenience for Excel

  • file_part_0pad (int) – When lines_per_file is set, this parameter is the number of leading 0 used to pad the filename part suffix. A value of 4 will result in files being named: my_file_0000.csv, my_file_0001.csv, and so forth

  • csv_config (CSVConfig | None) – Configuration for the CSV format

  • args (Any) – Passed to BaseListener

  • kwargs (Any) – Passed to BaseListener


class scrutiny.sdk.listeners.csv_file_listener.CSVConfig[source]#

CSV format options to be used by the CSVFileListener

encoding: str = 'utf8'#

File encoding

newline: str = '\n'#

CSV new line specifier

delimiter: str = ','#

CSV delimiter

quotechar: str = '"'#

CSV quote char

quoting: int = 2#

The quoting strategy. Refers to the python csv module. Default: csv.QUOTE_NONNUMERIC