Use cases#
It may not be so obvious at first glance what is the purpose of the Scrutiny SDK and what can be achieved with it.
In this article, some typical interesting use cases will be explored
HIL Testing#
Hardware-In-the-Loop testing is certainly the most interesting feature enabled by the Python SDK.
Let’s consider an hypothetical device where a hardware power-up sequence must be controlled by an onboard microcontroller. Upon startup, the firmware will initialise itself then launch the power-up sequence of other hardware modules using GPIO. Additional GPIO may be used to get a feedback, an analog input too.
C++ Application#
The following piece of C++ depicts a very simplified version of what it could be. We have a PowerSupply
class that does a fictive power up sequence with
a finite state machine. Notice how this file can be built with the ENABLE_HIL_TESTING
define, allowing the application to delay its startup until Scrutiny
takes over.
/*
This example is a dummy project that shows a fictive embedded application that does a power up sequence
of a fictive power supply using a Finite State Machine. The purpose of this example is to show how we can use Scrutiny
to do Hardware In The Loop testing by controlling the flow of the application, reading/writing hardware IOs and reading
internal states.
The file is presented as a single-file project that aggregates many files. Each file is not complete on purpose. They only show
the code relevant to this example.
*/
// time.hpp
#include <cstdint>
uint32_t timestamp_ms(); // Reads an absolute monotonic timestamp.
// hardware.hpp
struct InputsOutputs{
struct{
bool psu_ready;
bool submodule1_ready;
float psu_voltage_line1;
float psu_voltage_line2;
} in;
struct {
bool enable_psu;
bool enable_submodule1;
} out;
};
void read_ios(InputsOutputs*); // Reads all inputs from the hardware
// power_supply.hpp
// Fictive class that handles a fictive power supply.
// We are interested in the power up sequence
#define VOLTAGE_THRESHOLD_5V 4.75
#define VOLTAGE_THRESHOLD_12V 11.5
#define POWERUP_TIMEOUT_MS 500
class PowerSupply
{
public:
enum class PowerUpSequenceState{
DONE_OK=0,
INIT,
PSU_ENABLE,
SUBMODULE1_ENABLE,
FAILED
};
void init(InputsOutputs *inputs_outputs);
void process();
inline bool is_success() const {
return m_actual_state == PowerUpSequenceState::DONE_OK;
}
inline bool is_error() const {
return m_actual_state == PowerUpSequenceState::FAILED;
}
private:
PowerUpSequenceState m_last_state;
PowerUpSequenceState m_actual_state;
uint32_t m_start_timestamp;
InputsOutputs *m_ios;
};
// power_supply.cpp
void PowerSupply::init(InputsOutputs *inputs_outputs)
{
m_last_state = PowerUpSequenceState::INIT;
m_actual_state = PowerUpSequenceState::INIT;
m_ios=inputs_outputs;
}
void PowerSupply::process()
{
bool const state_entry = (m_last_state != m_actual_state);
PowerUpSequenceState next_state = m_actual_state;
switch (m_actual_state){
case PowerUpSequenceState::INIT:
{
m_start_timestamp = timestamp_ms();
next_state = PowerUpSequenceState::PSU_ENABLE;
break;
}
case PowerUpSequenceState::PSU_ENABLE:
{
if (state_entry){
m_ios->out.enable_psu=true;
}
if (m_ios->in.psu_ready){
next_state = PowerUpSequenceState::SUBMODULE1_ENABLE;
}
break;
}
case PowerUpSequenceState::SUBMODULE1_ENABLE:
{
if (state_entry){
m_ios->out.enable_submodule1=true;
}
if (m_ios->in.submodule1_ready){
next_state = PowerUpSequenceState::DONE_OK;
}
break;
}
case PowerUpSequenceState::DONE_OK:
{
break; // Do something here.
}
case PowerUpSequenceState::FAILED:
{
break; // Nothing to do, wait for a reset maybe?
}
}
if (timestamp_ms() - m_start_timestamp > POWERUP_TIMEOUT_MS){
next_state = PowerUpSequenceState::FAILED;
}
m_last_state = m_actual_state;
m_actual_state = next_state;
}
// application.hpp
// Fictive application.
class Application{
public:
void init();
void process(PowerSupply* const);
};
void update_scrutiny(); // Update the scrutiny module. Not part of this example
// main.cpp
#ifdef ENABLE_HIL_TESTING
static volatile bool run_app = false; // Scrutiny will write that.
#else
constexpr bool run_app = true;
#endif
InputsOutputs inputs_outputs;
PowerSupply power_supply;
Application app;
int main()
{
app.init();
power_supply.init(&inputs_outputs);
while(true)
{
read_ios(&inputs_outputs);
if (run_app) // Wait for scrutiny to set this to true when ENABLE_HIL_TESTING is defined
{
power_supply.process();
app.process(&power_supply);
}
update_scrutiny(); // Refer to "Instrumenting a software" guide
}
return 0;
}
Python script#
An example of a Python script that tests the device could go as follow.
from scrutiny.sdk.client import ScrutinyClient
hostname = 'localhost'
port = 1234
client = ScrutinyClient()
with client.connect(hostname, port, wait_status=True): # Establish a websocket connection and wait for a first server status update
client.wait_device_ready(timeout=5)
# The following watch uses "aliases". For HIL testing, it is a good practice to keep
# the interface to the firmware abstracted to ensure the validity of this script across version of firmwares.
# Aliases are defined in the SFD, the byproduct of the SCrutiny post-build toolchain.
run_app = client.watch('/alias/app/run_app') # Maps to /var/static/main.cpp/run_app
psu_state = client.watch('/alias/power_supply/state') # Maps to /var/global/power_supply/m_actual_state
psu_last_state = client.watch('/alias/power_supply/last_state') # Maps to /var/global/power_supply/m_last_state
io_psu_ready = client.watch('/alias/ios/inputs/psu_ready') # Maps to /var/global/inputs_outputs/in/psu_ready
io_submodule1_ready = client.watch('/alias/ios/inputs/submodule1_ready') # Maps to /var/global/inputs_outputs/in/submodule1_ready
io_psu_voltage_line1 = client.watch('/alias/ios/inputs/psu_voltage_line1') # Maps to /var/global/inputs_outputs/in/psu_voltage_line1
io_psu_voltage_line2 = client.watch('/alias/ios/inputs/psu_voltage_line2') # Maps to /var/global/inputs_outputs/in/psu_voltage_line2
client.wait_new_value_for_all()
# Start of test sequence
assert io_psu_ready.value_bool == False
assert io_submodule1_ready.value_bool == False
assert io_psu_voltage_line1.value_float < 0.5
assert io_psu_voltage_line2.value_float < 0.5
run_app.value = True
psu_state.wait_value('DONE_OK', timeout=2) # string are passed as enum, will convert to a value of 0
assert io_psu_ready.value_bool == True
assert io_submodule1_ready.value_bool == True
assert io_psu_voltage_line1.value_float > 4.75
assert io_psu_voltage_line2.value_float > 11.5
print("Test passed")
End-Of-Line configuration#
EOL configuration is another intersting use case for the Scrutiny Python SDK. Let’s consider a product that requires a configuration step after manufacturing.
This configuration could be writing parameters and assembly information (model, serial number, etc) in a Non-Volatile Memory or burning a fuse in the processor. In most application, the NVM is connected to the processor, making it accessible to the firmware or a advanced JTAG.
In this example, we will show how we can add a hook in the firmware to let a remote user take control of a EEPROM
(a type of NVM) through the SDK. We will abstract the EEPROM driver under a class that have a write()
and a read()
method.
C++ Application#
// EEPROMDriver.hpp
#include <cstdint>
// Fictive driver, no need to know how it works.
class EEPROMDriver{
public:
int read(uint8_t * const buf, uint32_t const addr, uint32_t const size) const;
int write(uint8_t const * const buf, uint32_t const addr, uint32_t const size) const;
int erase();
};
// EEPROmConfigurator.hpp
class EEPROMConfigurator{
public:
static constexpr uint32_t BUFFER_SIZE=1024;
enum class Command{
None,
Read,
Write,
WriteAssemblyHeader,
Erase
};
struct AssemblyHeader {
uint8_t model;
uint8_t version;
uint8_t revision;
uint32_t serial;
};
EEPROMConfigurator(EEPROMDriver* const driver) :
m_buffer{},
m_size{0},
m_addr{0},
m_buffer_size{BUFFER_SIZE},
m_driver{driver},
m_cmd{Command::None},
m_assembly_header{},
m_last_return_code{0}
{}
void process();
private:
uint8_t m_buffer[BUFFER_SIZE];
uint32_t m_size;
uint32_t m_addr;
uint32_t m_buffer_size;
EEPROMDriver* const m_driver;
Command m_cmd;
AssemblyHeader m_assembly_header;
int m_last_return_code;
};
// EEPROmConfigurator.cpp
#include <cstring>
void EEPROMConfigurator::process()
{
// In this function, we do some action on the EEPROM based on m_cmd, a variable that will be controlled by Scrutiny
if (m_cmd == Command::None){
return;
}
m_last_return_code = 1;
if (m_cmd == Command::WriteAssemblyHeader)
{
m_last_return_code = m_driver->write(reinterpret_cast<uint8_t*>(&m_assembly_header), m_addr, sizeof(m_assembly_header));
}
else if (m_cmd == Command::Write)
{
if (m_size < BUFFER_SIZE)
{
m_last_return_code = m_driver->write(m_buffer, m_addr, m_size);
}
}
else if (m_cmd == Command::Read)
{
if (m_size < BUFFER_SIZE)
{
m_last_return_code = m_driver->read(m_buffer, m_addr, m_size);
}
}
else if (m_cmd == Command::Erase)
{
m_last_return_code = m_driver->erase();
}
m_cmd = Command::None;
}
// main.cpp
void init();
void run_application();
void update_scrutiny();
EEPROMDriver eeprom_driver;
#ifdef ENABLE_EOL_CONFIGURATOR
EEPROMConfigurator eeprom_configurator(&eeprom_driver);
#endif
int main()
{
init();
while(true)
{
#ifdef ENABLE_EOL_CONFIGURATOR
eeprom_configurator.process();
#endif
run_application();
update_scrutiny();
}
}
Python script#
from scrutiny.sdk.client import ScrutinyClient
hostname = 'localhost'
port = 1234
client = ScrutinyClient()
with client.connect(hostname, port, wait_status=True): # Establish a websocket connection and wait for a first server status update
client.wait_device_ready(timeout=5)
# The following watch uses "aliases". For HIL testing, it is a good practice to keep
# the interface to the firmware abstracted to ensure the validity of this script across version of firmwares.
# Aliases are defined in the SFD, the byproduct of the SCrutiny post-build toolchain.
eeprom_config_size = client.watch('/alias/eeprom_configurator/size') # Maps to /var/global/eeprom_configurator/m_size
eeprom_config_addr = client.watch('/alias/eeprom_configurator/addr') # Maps to /var/global/eeprom_configurator/m_addr
eeprom_config_cmd = client.watch('/alias/eeprom_configurator/cmd') # Maps to /var/global/eeprom_configurator/m_cmd
eeprom_config_return_code = client.watch('/alias/eeprom_configurator/return_code') # Maps to /var/global/eeprom_configurator/m_last_return_code
assembly_header_model = client.watch('/alias/eeprom_configurator/assembly_header/model') # Maps to /var/global/eeprom_configurator/m_assembly_header/model
assembly_header_version = client.watch('/alias/eeprom_configurator/assembly_header/version') # Maps to /var/global/eeprom_configurator/m_assembly_header/version
assembly_header_revision = client.watch('/alias/eeprom_configurator/assembly_header/revision') # Maps to /var/global/eeprom_configurator/m_assembly_header/revision
assembly_header_serial = client.watch('/alias/eeprom_configurator/assembly_header/serial') # Maps to /var/global/eeprom_configurator/m_assembly_header/serial
client.wait_new_value_for_all()
print("Writing the assembly information into the device EEPROM")
with client.batch_write():
# Order of write is respected. Block until completion at exit of "with" block
assembly_header_model.value = 1
assembly_header_version.value = 2
assembly_header_revision.value = 3
assembly_header_serial.value = 0x12345678
eeprom_config_addr.value = 0
eeprom_config_cmd.value_enum = 'WriteAssemblyHeader'
eeprom_config_cmd.wait_value('None', timeout=5)
# Make sure we do not read an old value
eeprom_config_return_code.wait_update(timeout=2)
assert eeprom_config_return_code.value_int == 0, "Write operation failed"
print("Successfully written")
We can even make something a little more advanced and encapsulate everything related to the EEPROM interaction into a EEPROMConfigurator
class.
Using the value of m_buffer
, which is a pointer, we can leverage ScrutinyClient.read_memory()
to dump the content of the EEPROM.
from scrutiny.sdk.client import ScrutinyClient
from scrutiny.sdk.watchable_handle import WatchableHandle
import io
import argparse
EEPROM_SIZE=4096 # Could come from a variable inside the eeprom driver.
# This class could be in a different file
class EepromConfiguration:
client:ScrutinyClient
size:WatchableHandle
addr:WatchableHandle
cmd:WatchableHandle
return_code:WatchableHandle
buffer_size:WatchableHandle
def __init__(self, client:ScrutinyClient):
self.client = client
self.size = client.watch('/alias/eeprom_configurator/size') # Maps to /var/global/eeprom_configurator/m_size
self.addr = client.watch('/alias/eeprom_configurator/addr') # Maps to /var/global/eeprom_configurator/m_addr
self.cmd = client.watch('/alias/eeprom_configurator/cmd') # Maps to /var/global/eeprom_configurator/m_cmd
self.return_code = client.watch('/alias/eeprom_configurator/return_code') # Maps to /var/global/eeprom_configurator/m_last_return_code
self.buffer = client.watch('/alias/eeprom_configurator/buffer') # Maps to /var/global/eeprom_configurator/m_buffer
self.buffer_size = client.watch('/alias/eeprom_configurator/buffer_size') # Maps to /var/global/eeprom_configurator/m_buffer_size
self.client.wait_new_value_for_all()
def dump_eeprom(self, f:io.BufferedWriter) -> None:
bufsize = self.buffer_size.value_int
cursor=0
while cursor < EEPROM_SIZE:
size_to_read = min(bufsize, EEPROM_SIZE-cursor)
with self.client.batch_write():
self.addr.value = cursor
self.value = size_to_read
self.cmd.value_enum='Read' # strings = enum value
self.cmd.wait_value(0, timeout=5) # self.cmd.wait_value('None', timeout=5)
self.return_code.wait_update(timeout=3)
if self.return_code.value_int != 0:
raise RuntimeError(f"Failed to read the EEPROM data at addr={cursor} with size={size_to_read}")
# self.buffer is a pointer, its value contains a memory address
data = self.client.read_memory(self.buffer.value_int, size_to_read, timeout=5) # Reads the content of the buffer
f.write(data)
cursor+=size_to_read
# dump_eeprom.py
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument('filename', help="The output file")
parser.add_argument('--host', default='localhost', help="The output file")
parser.add_argument('--port', type=int, default=1234, help="The output file")
args = parser.parse_args()
client = ScrutinyClient()
with client.connect(args.hostname, args.port, wait_status=True): # Establish a websocket connection and wait for a first server status update
client.wait_device_ready(timeout=5)
configurator = EepromConfiguration(client)
with open(args.filename, 'wb') as f:
configurator.dump_eeprom(f)
print(f"Successfully dumped the device EEPROM content to {args.filename} ")
if __name__ == '__main__':
main()
Control algorithm tuning#
The following example is a bit more creative and shows how powerful Scrutiny can be when developping embedded firmware.
It illustrates a scenario where a PI controller operates within a 10KHz control loop, demonstrating how the controller’s response could be characterized automatically. For the sake of simplicity, this example does not include validation of the controller’s operational conditions (i.e., there are no error conditions or feedback range checks).
The controller in question is a PI controller with a saturated output.
The parameters can be declared as
const
orvolatile
based on a compilation optionWhen declared as
const
the compiler can optimize for speed, which would be a typical production configuration.When declared as
volatile
, the compiler will not perform any optimization, ensuring that the scrutiny input will be recognized.
The configuration of the scrutiny-embedded library is not shown in this example, as the focus is primarily on the SDK.
C++ Application#
// pi_controller_sat.hpp
#include <algorithm>
// When tuning is enabled, tunables becomes volatile variable
// preventing optimization and ensuring live updates works all the time.
#ifdef ENABLE_TUNNING
#define TUNABLE_CONST volatile
#define TUNABLE volatile
#else
#define TUNABLE_CONST const
#define TUNABLE
#endif
inline float saturate(float const v, float const min, float const max){
return std::min(std::max(v, min), max);
}
// PI controller system with saturated output
class PIControllerSat{
public:
PIControllerSat(float const ts,
float TUNABLE_CONST kp,
float TUNABLE_CONST ki,
float TUNABLE_CONST min,
float TUNABLE_CONST max,
float TUNABLE_CONST sat_margin
):
m_feedback{0.0f},
m_ref{0.0f},
m_out{0.0f},
m_state{0.0f},
m_kp{kp},
m_ki{ki},
m_min{min},
m_max{max},
m_sat_margin{sat_margin},
m_ts{ts}
{}
void reset(float val=0.0f){
m_out = val;
float const err = m_ref-m_feedback;
float const err_kp_sat = saturate(err*m_kp, m_min, m_max);
float const err_ki_ts = err*m_ki*m_ts;
m_state = 0.0f-err_ki_ts-err_kp_sat;
}
inline void set_inputs(float const feedback, float const ref){
m_feedback=feedback;
m_ref=ref;
}
void update(){
float const err = m_ref-m_feedback;
float const err_kp_sat = saturate(err*m_kp, m_min, m_max);
float const err_ki_ts = err*m_ki*m_ts;
float const pre_sat_out = saturate(err_kp_sat+err_ki_ts+m_state, m_min-m_sat_margin, m_max+m_sat_margin);
m_out = saturate(pre_sat_out, m_min, m_max);
m_state = pre_sat_out-err_kp_sat;
}
inline float out() const{
return m_out;
}
private:
// volatile when tuning is enabled. (calibration)
float TUNABLE m_feedback;
float TUNABLE m_ref;
float TUNABLE m_out;
float TUNABLE m_state;
// volatile when tuning is enabled. (calibration)
// const when tuning is disabled (production)
float TUNABLE_CONST m_kp;
float TUNABLE_CONST m_ki;
float TUNABLE_CONST m_max;
float TUNABLE_CONST m_min;
float TUNABLE_CONST m_sat_margin;
float const m_ts;
};
// main.cpp
constexpr float CONTROL_LOOP_FREQ = 10000.0f;
constexpr float CONTROLLER_KP = 2.0;
constexpr float CONTROLLER_KI = 0.1;
constexpr float CONTROLLER_MIN = 0;
constexpr float CONTROLLER_MAX = 1;
constexpr float CONTROLLER_MARGIN = 0.02;
PIControllerSat m_controller(1.0f/CONTROL_LOOP_FREQ, CONTROLLER_KP, CONTROLLER_KI, CONTROLLER_MIN, CONTROLLER_MAX, CONTROLLER_MARGIN);
float read_feedback(); // Reads the controller feedback (from an IO port)
float get_setpoint(); // Get the setpoint from another module of the software (user input, antoher algorithm)
void apply_comand(float cmd); // Apply the command computed by the controller to an output
void start_scheduler_task(void(*func)() , float freq);
void idle_task();
void init_scrutiny();
void scrutiny_run_10khz_loop_handler();
void control_task_10khz()
{
float setpoint;
#ifdef ENABLE_TUNNING
static volatile bool manual_control = false;
static volatile float manual_control_setpoint = 0.0f;
if (manual_control)
{
setpoint=manual_control_setpoint;
}
else
#endif
{
setpoint = get_setpoint();
}
m_controller.set_inputs(read_feedback(), setpoint);
m_controller.update();
apply_comand(m_controller.out());
// Scrutiny loop handler would be run here. Enables Datalogging
scrutiny_run_10khz_loop_handler();
}
int main(){
init_scrutiny(); // Configuration of datalogging and registration of the 10khz loop would happen here
start_scheduler_task(control_task_10khz, CONTROL_LOOP_FREQ);
while(true){
idle_task(); // Scrutiny Main Handler would be run here
}
}
The following python script accepts gains value (kp, ki) from the command line and initiates a characterization process. This process involves incrementally adjusting the control reference from 0 to 0.1, then 0 to 0.2, 0 to 0.3, and so on. At each step, the built-in data logger records the controller’s response to the input step. The script then saves this data to a CSV file using the Scrutiny SDK.
Python script#
from scrutiny.sdk.client import ScrutinyClient
from scrutiny.sdk.watchable_handle import WatchableHandle
from scrutiny.sdk import datalogging
import scrutiny.sdk.exceptions
import argparse
import time
class PIController:
kp:WatchableHandle
ki:WatchableHandle
max:WatchableHandle
min:WatchableHandle
sat_margin:WatchableHandle
ref:WatchableHandle
feedback:WatchableHandle
output:WatchableHandle
def __init__(self, client:ScrutinyClient, basepath:str):
basepath = basepath.rstrip('/')
self.kp = client.watch(f'{basepath}/m_kp')
self.ki = client.watch(f'{basepath}/m_ki')
self.max = client.watch(f'{basepath}/m_max')
self.min = client.watch(f'{basepath}/m_min')
self.sat_margin = client.watch(f'{basepath}/m_sat_margin')
self.ref = client.watch(f'{basepath}/m_ref')
self.feedback = client.watch(f'{basepath}/m_feedback')
self.output = client.watch(f'{basepath}/m_out')
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument('--kp', type=float, required=True, help="The proportional gain")
parser.add_argument('--ki', type=float, required=True, help="The integral gain")
args = parser.parse_args()
hostname = 'localhost'
port = 1234
client = ScrutinyClient()
with client.connect(hostname, port, wait_status=True): # Establish a websocket connection and wait for a first server status update
client.wait_device_ready(timeout=5)
controller = PIController(client, '/var/global/m_controller')
manual_control = client.watch("/var/static/main.cpp/control_task/manual_control")
set_point = client.watch("/var/static/main.cpp/control_task/manual_control_setpoint")
client.wait_new_value_for_all()
controller.kp.value = args.kp
controller.ki.value = args.ki
config = datalogging.DataloggingConfig(sampling_rate=0, decimation=1, timeout=0, name="MyGraph")
config.configure_trigger(datalogging.TriggerCondition.GreaterThan, [set_point, 0], position=0.1, hold_time=0)
config.configure_xaxis(datalogging.XAxisType.IdealTime)
pv_axis = config.add_axis('Process Var')
cmd_axis = config.add_axis('Command')
config.add_signal(controller.ref, pv_axis, name="Reference")
config.add_signal(controller.feedback, pv_axis, name="Feedback")
config.add_signal(controller.output, cmd_axis, name="Command")
done=False
actual_set_point=0.1
manual_control.value = True
try:
while not done:
set_point.value = 0
time.sleep(1) # Wait for stabilization. Setpoint changed to 0
print(f"Starting the datalogger for setpoint={actual_set_point}")
request = client.start_datalog(config)
set_point.value = actual_set_point
try:
# Build a filename based on the actual parameters
filename=f"controller_test_kp={args.kp:0.4f}_ki={args.ki:0.4f}_sp_{actual_set_point:0.3f}.csv"
acquisition = request.wait_and_fetch(timeout=5)
print(f"Acquisition complete. Saving to {filename}")
acquisition.to_csv(filename)
actual_set_point+=0.1
if actual_set_point > 1:
done = True
except scrutiny.sdk.exceptions.TimeoutException as e:
done=True
print(f"The datalogger failed to catch the event. {e}")
except Exception as e:
print(f"Error: {e}")
finally:
print("Done")
try:
manual_control.value = False
except: pass
if __name__ == '__main__':
main()