Listeners#

Introduction#

Synchronous access to watchables (variables, aliases, and RPVs), as outlined in the Accessing Variables section, can be useful. However, it has certain limitations, especially when monitoring multiple values simultaneously.

For example, if one wants to log a list of watchables, it would required to continuously loop and monitor the udpate_counter property to detect changes. However, this approach does not guarantee that all changes will be noticed by the user thread. In addition to being unreliable, this technique will cause unnecessary CPU usage.

To address this issue, the Client object can function as a Notifier. It informs a list of listeners when it receives a value update broadcast from the server. Registering a listener is done through register_listener

ScrutinyClient.register_listener(listener)[source]#

Register a new listener. The client will notify it each time a new value update is received from the server

Parameters:

listener (BaseListener) – The listener to register

Return type:

None


Using a listener#

In order to use a listener, one must create it, register it to the client, subscribe to watchables elements with subscribe() and then start the listener.

BaseListener.subscribe(watchables)[source]#

Add one or many new watchables to the list of monitored watchables.

Parameters:

watchables (WatchableHandle | Iterable[WatchableHandle]) – The list of watchables to add to the monitor list

Raises:
  • TypeError – Given parameter not of the expected type

  • ValueError – Given parameter has an invalid value

  • InvalidValueError – If the watchable handle is not ready to be used (not configured by the server)

Return type:

None

It is possible to unsubscribe to watchables with one of the following method

BaseListener.unsubscribe(watchables)[source]#

Remove one or many watchables from the list of monitored watchables.

Parameters:

watchables (WatchableHandle | Iterable[WatchableHandle]) – The list of watchables to remove from the monitor list

Raises:
  • TypeError – Given parameter not of the expected type

  • ValueError – Given parameter has an invalid value

  • KeyError – Given watchable was not monitored previously

Return type:

None


BaseListener.unsubscribe_all()[source]#

Removes all watchables from the monitored list. Does not stop the listener

Return type:

None


BaseListener.prune_subscriptions()[source]#

Release the references to any subscribed watchables that are not being watched anymore

Return type:

None

Each listener has a start() and a stop() method. These methods launches an internal thread that will handle each new value updates.

BaseListener.start()[source]#

Starts the listener thread. Once started, no more subscription can be added.

Raises:

OperationFailure – If an error occur while starting the listener

Return type:

BaseListener


BaseListener.stop()[source]#

Stops the listener thread

Return type:

None

start() can be used is a with statement like so

from scrutiny.sdk.client import ScrutinyClient
from scrutiny.sdk.listeners.text_stream_listener import TextStreamListener
import time

client = ScrutinyClient()
with client.connect('localhost', 1234):
    listener = TextStreamListener()     # TextStreamListener prints all updates to stdout by default
    client.register_listener(listener)  # Attach to the client

    some_var_1 = client.watch('/var/global/some_var')
    the_other_var = client.watch('/var/static/main.cpp/the_other_var')

    listener.subscribe([some_var_1, the_other_var]) # Tells the listener which watchable to listen for
    with listener.start():  # Start the listener
        # setup() has been called from the listener thread
        time.sleep(5)
    # teardown() has been called from the listener thread

    print("We are done")
# Client is automatically disconnected

Listeners may or may not allow a user to add or remove watchables from their subscription list while the listener is active. This behavior is controlled by overriding allow_subcription_changes_while_running().

Some listeners will allow it (like the TextStreamListener or the BufferedReaderListener) , but some does not (like the CSVFileListener). When not allowed, a NotAllowedError will be raised if one of the following method is called after start() has been called.

BaseListener.allow_subcription_changes_while_running()[source]#

Indicate if it is allowed to change the subscription list after the listener is started. This method can be overriden.

The following methods affect the watchable subscription list
return:

True if it is allowed to modify the subscriptions when running. False otherwise

Return type:

bool


Internal behavior#

A listener runs in a separate thread and awaits value updates by monitoring a python queue that is fed by the client object. The Python queue object internally utilizes condition variables, which results in a scheduler switch between the notifier thread and the listner thread occurring in just microseconds.

When the update notification reaches the listener, they are forwarded to the listener-specific receive() method.

_images/listener_threads.png

Once the user thread invokes the start() method, the listener thread is launched and the setup() method is called from within this new thread.

If start() succeeds and setup() is correctly invoked, the teardown() method is guaranteed to be invoked too, irrespective of whether an exception has been raised within the setup() or receive().

The teardown() is called from the listener thread if the user calls stop() or if an exception occur during setup or while listening.


Writing a Listener#

To write a listener, one must create a class that inherits the BaseListener class and implements the receive() method.

class scrutiny.sdk.listeners.BaseListener[source]
class Statistics[source]

(Immutable struct) A data structure containing several useful debugging metrics for a listener

update_received_count: int

Total number of value update received by the server. This value can grow very large

update_drop_count: int

Number of value update that needed to be dropped because of queue overflow

update_per_sec: float

Estimated rate of update/sec averaged of the few seconds

internal_qsize: int

Number of element in the internal queue

__init__(update_received_count, update_drop_count, update_per_sec, internal_qsize)
Parameters:
  • update_received_count (int) –

  • update_drop_count (int) –

  • update_per_sec (float) –

  • internal_qsize (int) –

Return type:

None

__init__(name=None, queue_max_size=1000)[source]

Base abstract class for all listeners. receive must be overriden. setup and teardown can optionally be overriden.

Parameters:
  • name (str | None) – Name of the listener used for logging purpose

  • queue_max_size (int) – Internal queue maximum size. If the queue is ever full, the update notification will be dropped

Return type:

None

process()[source]

Method periodically called inside the listener thread. Does nothing by default

Return type:

None

unsubscribe(watchables)[source]

Remove one or many watchables from the list of monitored watchables.

Parameters:

watchables (WatchableHandle | Iterable[WatchableHandle]) – The list of watchables to remove from the monitor list

Raises:
  • TypeError – Given parameter not of the expected type

  • ValueError – Given parameter has an invalid value

  • KeyError – Given watchable was not monitored previously

Return type:

None

unsubscribe_all()[source]

Removes all watchables from the monitored list. Does not stop the listener

Return type:

None

get_subscriptions()[source]

Returns a set with all the watchables that this listener is subscribed to

Return type:

Set[WatchableHandle]

prune_subscriptions()[source]

Release the references to any subscribed watchables that are not being watched anymore

Return type:

None

allow_subcription_changes_while_running()[source]

Indicate if it is allowed to change the subscription list after the listener is started. This method can be overriden.

The following methods affect the watchable subscription list
return:

True if it is allowed to modify the subscriptions when running. False otherwise

Return type:

bool

get_stats()[source]

Returns internal performance metrics for debugging purpose

Return type:

Statistics

reset_stats()[source]

Reset performance metrics that can reset

Return type:

None

property is_started: bool

Tells if the listener thread is running

property name: str

The name of the listener

property error_occured: int

Tells if an error occured while running the listener

property drop_count: int

The number of update dropped due to a full internal queue

property update_count: int

The number of update received (not dropped)


abstract BaseListener.receive(updates)[source]#

Method called by the listener thread each time the client notifies the listeners for one or many updates

Parameters:

updates (List[ValueUpdate]) – List of updates being broadcast

Return type:

None


The element passed to receive() are immutable ValueUpdate objects that represents the update content.

class scrutiny.sdk.listeners.ValueUpdate[source]#

(Immutable struct) Contains the relevant information about a watchable update broadcast by the server

watchable: WatchableHandle#

A reference to the watchable object that generated the update

value: int | float | bool#

Value received in the update

update_timestamp: datetime#

Timestamp of the update. Taken by the server right after reading the device. Precise to the microsecond


Two optional methods can be overriden to perform a setup and/or a teardown. If not overriden, these 2 methods will do nothing by default.

BaseListener.setup()[source]#

Overridable function called by the listener from its thread when starting, before monitoring

Return type:

None


BaseListener.teardown()[source]#

Overridable function called by the listener from its thread when stopping, right after being done monitoring

Return type:

None


Performance statistics#

It is possible to get some interesting numbers about the performance of the listener using get_stats()

BaseListener.get_stats()[source]#

Returns internal performance metrics for debugging purpose

Return type:

Statistics


BaseListener.reset_stats()[source]#

Reset performance metrics that can reset

Return type:

None


class scrutiny.sdk.listeners.BaseListener.Statistics#

(Immutable struct) A data structure containing several useful debugging metrics for a listener

__init__(update_received_count, update_drop_count, update_per_sec, internal_qsize)#
Parameters:
  • update_received_count (int) –

  • update_drop_count (int) –

  • update_per_sec (float) –

  • internal_qsize (int) –

Return type:

None

__new__(**kwargs)#

Available listeners#

There is a few listeners already available in the Scrutiny SDK.

TextStreamListener#

class scrutiny.sdk.listeners.text_stream_listener.TextStreamListener[source]#
__init__(stream=<_io.TextIOWrapper name='<stdout>' mode='w' encoding='utf-8'>, *args, **kwargs)[source]#

Create a listener that writes every value update it receive into a text stream, by formatting the update into a single-line string representation of the form: <time>ms\t(<type>/<datatype>) <path>: <value>.

Where

  • <time> is the relative time in millisecond since the listener has been started

  • <type> is the watchable type : variable, alias or rpv

  • <datatype> is the value datatype, such as : sint8, float32, uint16, etc.

  • <path> is the tree path used to identify the watchable at the server level

  • <value> Value converted to text

Adding/removing subscriptions while running is allowed

Parameters:
  • stream (TextIO) – The text stream to write to. Defaults to stdout

  • args (Any) – Passed to BaseListener

  • kwargs (Any) – Passed to BaseListener

BufferedReaderListener#

class scrutiny.sdk.listeners.buffered_reader_listener.BufferedReaderListener[source]#
__init__(queue_max_size, *args, **kwargs)[source]#

Creates a listener that makes a copy of every received ValueUpdate object and push them into a queue, waiting for the user to read them.

Adding/removing subscriptions while running is allowed

Parameters:
  • queue_max_size (int) – Queue max size. Updates will be dropped if the queue is not read fast enough and this size is exceeded

  • args (Any) – Passed to BaseListener

  • kwargs (Any) – Passed to BaseListener

get_queue()[source]#

Returns the queue used for storage

Return type:

Queue[ValueUpdate]

CSVFileListener#

class scrutiny.sdk.listeners.csv_file_listener.CSVFileListener[source]#
__init__(folder, filename, lines_per_file=None, datetime_format='%Y-%m-%d %H:%M:%S.%f', convert_bool_to_int=True, file_part_0pad=4, csv_config=None, *args, **kwargs)[source]#

Listener that writes the watchable values into a CSV file as they are received

Adding/removing subscriptions while running is not allowed since it affects the list of columns

Parameters:
  • folder (str) – Folder in which to save the CSV file

  • filename (str) – Name of the file to create

  • lines_per_file (int | None) – Maximum number of lines per file, no limits if None. When this value is set to a valid integer, the file naming pattern will be <filename>_XXXX.csv where XXXX is the the part number starting from 0. When no limit is specified, a single CSV file will be created following with name <filename>.csv

  • datetime_format (str) – Format string for the datetime printed in the CSV file

  • convert_bool_to_int (bool) – When True, boolean values will be printed as 0 and 1 instead of False and True. Convenience for Excel

  • file_part_0pad (int) – When lines_per_file is set, this parameter is the number of leading 0 used to pad the filename part suffix. A value of 4 will result in files being named: my_file_0000.csv, my_file_0001.csv, and so forth

  • csv_config (CSVConfig | None) – Configuration for the CSV format

  • args (Any) – Passed to BaseListener

  • kwargs (Any) – Passed to BaseListener


class scrutiny.sdk.listeners.csv_file_listener.CSVConfig[source]#

CSV format options to be used by the CSVFileListener

encoding: str = 'utf8'#

File encoding

newline: str = '\n'#

CSV new line specifier

delimiter: str = ','#

CSV delimiter

quotechar: str = '"'#

CSV quote char

quoting: int = 2#

The quoting strategy. Refers to the python csv module. Default: csv.QUOTE_NONNUMERIC