Listeners#
Introduction#
Synchronous access to watchables (variables, aliases, and RPVs), as outlined in the Accessing Variables section, can be useful. However, it has certain limitations, especially when monitoring multiple values simultaneously.
For example, if one wants to log a list of watchables, it would required to continuously loop and monitor the
udpate_counter
property to detect changes.
However, this approach does not guarantee that all changes will be noticed by the user thread.
In addition to being unreliable, this technique will cause unnecessary CPU usage.
To address this issue, the Client
object can function as a Notifier.
It informs a list of listeners when it receives a value update broadcast from the server.
Registering a listener is done through register_listener
Using a listener#
In order to use a listener, one must create it, register it to the client, subscribe to watchables elements with
subscribe()
and then start the listener.
- BaseListener.subscribe(watchables)[source]#
Add one or many new watchables to the list of monitored watchables.
- Parameters:
watchables (WatchableHandle | Iterable[WatchableHandle]) – The list of watchables to add to the monitor list
- Raises:
TypeError – Given parameter not of the expected type
ValueError – Given parameter has an invalid value
InvalidValueError – If the watchable handle is not ready to be used (not configured by the server)
- Return type:
None
It is possible to unsubscribe to watchables with one of the following method
- BaseListener.unsubscribe(watchables)[source]#
Remove one or many watchables from the list of monitored watchables.
- Parameters:
watchables (WatchableHandle | Iterable[WatchableHandle]) – The list of watchables to remove from the monitor list
- Raises:
TypeError – Given parameter not of the expected type
ValueError – Given parameter has an invalid value
KeyError – Given watchable was not monitored previously
- Return type:
None
- BaseListener.unsubscribe_all()[source]#
Removes all watchables from the monitored list. Does not stop the listener
- Return type:
None
- BaseListener.prune_subscriptions()[source]#
Release the references to any subscribed watchables that are not being watched anymore
- Return type:
None
Each listener has a start()
and a stop()
method.
These methods launches an internal thread that will handle each new value updates.
- BaseListener.start()[source]#
Starts the listener thread. Once started, no more subscription can be added.
- Raises:
OperationFailure – If an error occur while starting the listener
- Return type:
BaseListener
start()
can be used is a with
statement like so
from scrutiny.sdk.client import ScrutinyClient
from scrutiny.sdk.listeners.text_stream_listener import TextStreamListener
import time
client = ScrutinyClient()
with client.connect('localhost', 1234):
listener = TextStreamListener() # TextStreamListener prints all updates to stdout by default
client.register_listener(listener) # Attach to the client
some_var_1 = client.watch('/var/global/some_var')
the_other_var = client.watch('/var/static/main.cpp/the_other_var')
listener.subscribe([some_var_1, the_other_var]) # Tells the listener which watchable to listen for
with listener.start(): # Start the listener
# setup() has been called from the listener thread
time.sleep(5)
# teardown() has been called from the listener thread
print("We are done")
# Client is automatically disconnected
Listeners may or may not allow a user to add or remove watchables from their subscription list while the listener is active.
This behavior is controlled by overriding allow_subcription_changes_while_running()
.
Some listeners will allow it (like the TextStreamListener
or the BufferedReaderListener
)
, but some does not (like the CSVFileListener
). When not allowed, a NotAllowedError
will
be raised if one of the following method is called after start()
has been called.
- BaseListener.allow_subcription_changes_while_running()[source]#
Indicate if it is allowed to change the subscription list after the listener is started. This method can be overriden.
- The following methods affect the watchable subscription list
-
- return:
True
if it is allowed to modify the subscriptions when running.False
otherwise
- Return type:
bool
Internal behavior#
A listener runs in a separate thread and awaits value updates by monitoring a python queue
that is fed by the
client
object.
The Python queue
object internally utilizes condition variables, which results in a scheduler switch between
the notifier thread and the listner thread occurring in just microseconds.
When the update notification reaches the listener, they are forwarded to the listener-specific
receive()
method.

Once the user thread invokes the start()
method, the listener thread is launched
and the setup()
method is called from within this new thread.
If start()
succeeds and setup()
is correctly invoked, the teardown()
method is guaranteed to be invoked too,
irrespective of whether an exception has been raised within the setup()
or receive()
.
The teardown()
is called from the listener thread if the user calls
stop()
or if an exception occur during setup or while listening.
Writing a Listener#
To write a listener, one must create a class that inherits the BaseListener
class and implements
the receive()
method.
- class scrutiny.sdk.listeners.BaseListener[source]
- class Statistics[source]
(Immutable struct) A data structure containing several useful debugging metrics for a listener
- update_received_count: int
Total number of value update received by the server. This value can grow very large
- update_drop_count: int
Number of value update that needed to be dropped because of queue overflow
- update_per_sec: float
Estimated rate of update/sec averaged of the few seconds
- internal_qsize: int
Number of element in the internal queue
- __init__(update_received_count, update_drop_count, update_per_sec, internal_qsize)
- Parameters:
update_received_count (int) –
update_drop_count (int) –
update_per_sec (float) –
internal_qsize (int) –
- Return type:
None
- __init__(name=None, queue_max_size=1000)[source]
Base abstract class for all listeners.
receive
must be overriden.setup
andteardown
can optionally be overriden.- Parameters:
name (str | None) – Name of the listener used for logging purpose
queue_max_size (int) – Internal queue maximum size. If the queue is ever full, the update notification will be dropped
- Return type:
None
- process()[source]
Method periodically called inside the listener thread. Does nothing by default
- Return type:
None
- unsubscribe(watchables)[source]
Remove one or many watchables from the list of monitored watchables.
- Parameters:
watchables (WatchableHandle | Iterable[WatchableHandle]) – The list of watchables to remove from the monitor list
- Raises:
TypeError – Given parameter not of the expected type
ValueError – Given parameter has an invalid value
KeyError – Given watchable was not monitored previously
- Return type:
None
- unsubscribe_all()[source]
Removes all watchables from the monitored list. Does not stop the listener
- Return type:
None
- get_subscriptions()[source]
Returns a set with all the watchables that this listener is subscribed to
- Return type:
Set[WatchableHandle]
- prune_subscriptions()[source]
Release the references to any subscribed watchables that are not being watched anymore
- Return type:
None
- allow_subcription_changes_while_running()[source]
Indicate if it is allowed to change the subscription list after the listener is started. This method can be overriden.
- The following methods affect the watchable subscription list
-
- return:
True
if it is allowed to modify the subscriptions when running.False
otherwise
- Return type:
bool
- get_stats()[source]
Returns internal performance metrics for debugging purpose
- Return type:
- reset_stats()[source]
Reset performance metrics that can reset
- Return type:
None
- property is_started: bool
Tells if the listener thread is running
- property name: str
The name of the listener
- property error_occured: int
Tells if an error occured while running the listener
- property drop_count: int
The number of update dropped due to a full internal queue
- property update_count: int
The number of update received (not dropped)
- abstract BaseListener.receive(updates)[source]#
Method called by the listener thread each time the client notifies the listeners for one or many updates
- Parameters:
updates (List[ValueUpdate]) – List of updates being broadcast
- Return type:
None
The element passed to receive()
are immutable ValueUpdate
objects that represents the update content.
- class scrutiny.sdk.listeners.ValueUpdate[source]#
(Immutable struct) Contains the relevant information about a watchable update broadcast by the server
- watchable: WatchableHandle#
A reference to the watchable object that generated the update
- value: int | float | bool#
Value received in the update
- update_timestamp: datetime#
Timestamp of the update. Taken by the server right after reading the device. Precise to the microsecond
Two optional methods can be overriden to perform a setup
and/or
a teardown
. If not overriden, these 2 methods will do nothing by default.
- BaseListener.setup()[source]#
Overridable function called by the listener from its thread when starting, before monitoring
- Return type:
None
Performance statistics#
It is possible to get some interesting numbers about the performance of the listener using get_stats()
- BaseListener.get_stats()[source]#
Returns internal performance metrics for debugging purpose
- Return type:
- class scrutiny.sdk.listeners.BaseListener.Statistics#
(Immutable struct) A data structure containing several useful debugging metrics for a listener
- __init__(update_received_count, update_drop_count, update_per_sec, internal_qsize)#
- Parameters:
update_received_count (int) –
update_drop_count (int) –
update_per_sec (float) –
internal_qsize (int) –
- Return type:
None
- __new__(**kwargs)#
Available listeners#
There is a few listeners already available in the Scrutiny SDK.
TextStreamListener#
- class scrutiny.sdk.listeners.text_stream_listener.TextStreamListener[source]#
- __init__(stream=<_io.TextIOWrapper name='<stdout>' mode='w' encoding='utf-8'>, *args, **kwargs)[source]#
Create a listener that writes every value update it receive into a text stream, by formatting the update into a single-line string representation of the form:
<time>ms\t(<type>/<datatype>) <path>: <value>
.Where
<time> is the relative time in millisecond since the listener has been started
<type> is the watchable type : variable, alias or rpv
<datatype> is the value
datatype
, such as : sint8, float32, uint16, etc.<path> is the tree path used to identify the watchable at the server level
<value> Value converted to text
Adding/removing subscriptions while running is allowed
- Parameters:
stream (TextIO) – The text stream to write to. Defaults to
stdout
args (Any) – Passed to
BaseListener
kwargs (Any) – Passed to
BaseListener
BufferedReaderListener#
- class scrutiny.sdk.listeners.buffered_reader_listener.BufferedReaderListener[source]#
- __init__(queue_max_size, *args, **kwargs)[source]#
Creates a listener that makes a copy of every received
ValueUpdate
object and push them into a queue, waiting for the user to read them.Adding/removing subscriptions while running is allowed
- Parameters:
queue_max_size (int) – Queue max size. Updates will be dropped if the queue is not read fast enough and this size is exceeded
args (Any) – Passed to
BaseListener
kwargs (Any) – Passed to
BaseListener
- get_queue()[source]#
Returns the queue used for storage
- Return type:
Queue[ValueUpdate]
CSVFileListener#
- class scrutiny.sdk.listeners.csv_file_listener.CSVFileListener[source]#
- __init__(folder, filename, lines_per_file=None, datetime_format='%Y-%m-%d %H:%M:%S.%f', convert_bool_to_int=True, file_part_0pad=4, csv_config=None, *args, **kwargs)[source]#
Listener that writes the watchable values into a CSV file as they are received
Adding/removing subscriptions while running is not allowed since it affects the list of columns
- Parameters:
folder (str) – Folder in which to save the CSV file
filename (str) – Name of the file to create
lines_per_file (int | None) – Maximum number of lines per file, no limits if
None
. When this value is set to a valid integer, the file naming pattern will be<filename>_XXXX.csv
whereXXXX
is the the part number starting from 0. When no limit is specified, a single CSV file will be created following with name<filename>.csv
datetime_format (str) – Format string for the datetime printed in the CSV file
convert_bool_to_int (bool) – When
True
, boolean values will be printed as 0 and 1 instead ofFalse
andTrue
. Convenience for Excelfile_part_0pad (int) – When
lines_per_file
is set, this parameter is the number of leading 0 used to pad the filename part suffix. A value of 4 will result in files being named: my_file_0000.csv, my_file_0001.csv, and so forthcsv_config (CSVConfig | None) – Configuration for the CSV format
args (Any) – Passed to
BaseListener
kwargs (Any) – Passed to
BaseListener
- class scrutiny.sdk.listeners.csv_file_listener.CSVConfig[source]#
CSV format options to be used by the CSVFileListener
- encoding: str = 'utf8'#
File encoding
- newline: str = '\n'#
CSV new line specifier
- delimiter: str = ','#
CSV delimiter
- quotechar: str = '"'#
CSV quote char
- quoting: int = 2#
The quoting strategy. Refers to the python csv module. Default:
csv.QUOTE_NONNUMERIC