Shepherd-Sheep
The shepherd-sheep API offers high level access to shepherd’s functionality and forms the base for the two command line utilities. With the introduction of the core-lib the api was simplified and modernized with a model-based approach. The pydantic data-models offer self-validating config-parameters with neutral defaults.
For lower-level access, have a look at the Shepherd-Sheep - Advanced. There is a third option called debug-api
, used i.e. by the programmer.
It will not be documented here.
To learn about the functionality the source should be consulted.
Harvesting
The run_harvester()
-function can be used to configure all relevant hardware and software and to sample and extract data from the analog frontend.
from contextlib import ExitStack
from shepherd_core.data_models.task import HarvestTask
from shepherd_sheep.shepherd_harvester import ShepherdHarvester
from shepherd_sheep.logger import set_verbosity
def run_harvester(cfg: HarvestTask) -> bool:
stack = ExitStack()
set_verbosity(state=cfg.verbose, temporary=True)
failed = True
try:
hrv = ShepherdHarvester(cfg=cfg)
stack.enter_context(hrv)
hrv.run()
failed = False
except SystemExit:
pass
except ShepherdIOError:
log.exception("Caught an unrecoverable error")
stack.close()
return failed
The snippet is taken from the actual implementation in sheep/init and references the HarvestTask
Emulating
The run_emulator()
-function can be used to emulate previously recorded IV data for an attached sensor node.
from contextlib import ExitStack
from shepherd_core.data_models.task import EmulationTask
from shepherd_sheep.shepherd_emulator import ShepherdEmulator
from shepherd_sheep.logger import set_verbosity
def run_emulator(cfg: EmulationTask) -> bool:
stack = ExitStack()
set_verbosity(state=cfg.verbose, temporary=True)
failed = True
try:
emu = ShepherdEmulator(cfg=cfg)
stack.enter_context(emu)
emu.run()
failed = False
except SystemExit:
pass
except ShepherdIOError:
log.exception("Caught an unrecoverable error")
stack.close()
return failed
The snippet is taken from the actual implementation in sheep/init and references the EmulationTask.
Note
TODO: add user/task-config and relink both tasks above
Modify Firmware
The run_firmware_mod()
-function can be used to customize a firmware before flashing it.
import shutil
from shepherd_core.data_models import FirmwareDType
from shepherd_core.data_models.task import FirmwareModTask
from shepherd_core.fw_tools import extract_firmware
from shepherd_core.fw_tools import firmware_to_hex
from shepherd_core.fw_tools import modify_uid
from shepherd_sheep.logger import set_verbosity
from shepherd_sheep.sysfs_interface import check_sys_access
def run_firmware_mod(cfg: FirmwareModTask) -> bool:
set_verbosity(state=cfg.verbose, temporary=True)
if check_sys_access(): # not really needed here
return True
file_path = extract_firmware(cfg.data, cfg.data_type, cfg.firmware_file)
if cfg.data_type in {FirmwareDType.path_elf, FirmwareDType.base64_elf}:
modify_uid(file_path, cfg.custom_id)
file_path = firmware_to_hex(file_path)
if file_path.as_posix() != cfg.firmware_file.as_posix():
shutil.move(file_path, cfg.firmware_file)
return False
The snippet is taken from the actual implementation in sheep/init and references the FirmwareModTask.
Program Target
The run_programmer()
-function can flash a .hex
-file to a target of choice.
from contextlib import ExitStack
from shepherd_core.data_models.task import ProgrammingTask
from shepherd_sheep import sysfs_interface
from shepherd_sheep.logger import set_verbosity
from shepherd_sheep.shepherd_debug import ShepherdDebug
# Note: probably some includes missing
def run_programmer(cfg: ProgrammingTask, rate_factor: float = 1.0) -> bool:
stack = ExitStack()
set_verbosity(state=cfg.verbose, temporary=True)
failed = False
try:
dbg = ShepherdDebug(use_io=False) # TODO: this could all go into ShepherdDebug
stack.enter_context(dbg)
dbg.select_port_for_power_tracking(
not dbg.convert_target_port_to_bool(cfg.target_port),
)
dbg.set_power_emulator(True)
dbg.select_port_for_io_interface(cfg.target_port)
dbg.set_power_io_level_converter(True)
sysfs_interface.write_dac_aux_voltage(cfg.voltage)
# switching target may restart pru
sysfs_interface.wait_for_state("idle", 5)
sysfs_interface.load_pru_firmware(cfg.protocol)
dbg.refresh_shared_mem() # address might have changed
log.info("processing file %s", cfg.firmware_file.name)
d_type = suffix_to_DType.get(cfg.firmware_file.suffix.lower())
if d_type != FirmwareDType.base64_hex:
log.warning("Firmware seems not to be HEX - but will try to program anyway")
# derive target-info
target = cfg.mcu_type.lower()
if "msp430" in target:
target = "msp430"
elif "nrf52" in target:
target = "nrf52"
else:
log.warning(
"MCU-Type needs to be [msp430, nrf52] but was: %s",
target,
)
# WORKAROUND that realigns hex for misguided programmer
path_str = cfg.firmware_file.as_posix()
path_tmp = tempfile.TemporaryDirectory()
stack.enter_context(path_tmp)
file_tmp = Path(path_tmp.name) / "aligned.hex"
# tmp_path because firmware can be in readonly content-dir
cmd = [
"/usr/bin/srec_cat",
# BL51 hex files are not sorted for ascending addresses. Suppress this warning
"-disable-sequence-warning",
# load input HEX file
path_str,
"-Intel",
# fill all incomplete 16-bit words with 0xFF. The range is limited to the application
"-fill=0xFF",
"-within",
path_str,
"-Intel",
"-range-padding=4",
# generate hex records with 16 byte data length (default 32 byte)
"-Output_Block_Size=16",
# generate 16- or 32-bit address records. Do not use 16-bit for address ranges > 64K
f"-address-length={2 if 'msp' in target else 4}",
# generate a Intel hex file
"-o",
file_tmp.as_posix(),
"-Intel",
]
ret = subprocess.run(cmd, timeout=30, check=False) # noqa: S603
if ret.returncode > 0:
log.error("Error during realignment (srec_cat): %s", ret.stderr)
failed = True
raise SystemExit # noqa: TRY301
if not (0.1 <= rate_factor <= 1.0):
raise ValueError("Scaler for data-rate must be between 0.1 and 1.0")
_data_rate = int(rate_factor * cfg.datarate)
with file_tmp.resolve().open("rb") as fw:
try:
dbg.shared_mem.write_firmware(fw.read())
if cfg.simulate:
target = "dummy"
if cfg.mcu_port == 1:
sysfs_interface.write_programmer_ctrl(
target,
_data_rate,
5,
4,
10,
)
else:
sysfs_interface.write_programmer_ctrl(
target,
_data_rate,
8,
9,
11,
)
log.info(
"Programmer initialized, will start now (data-rate = %d bit/s)", _data_rate
)
sysfs_interface.start_programmer()
except OSError:
log.error("OSError - Failed to initialize Programmer")
failed = True
except ValueError as xpt:
log.exception("ValueError: %s", str(xpt))
failed = True
state = "init"
while state != "idle" and not failed:
log.info(
"Programming in progress,\tpgm_state = %s, shp_state = %s",
state,
sysfs_interface.get_state(),
)
time.sleep(1)
state = sysfs_interface.check_programmer()
if "error" in state:
log.error(
"SystemError - Failed during Programming, p_state = %s",
state,
)
failed = True
if failed:
log.info("Programming - Procedure failed - will exit now!")
else:
log.info("Finished Programming!")
log.debug("\tshepherdState = %s", sysfs_interface.get_state())
log.debug("\tprogrammerState = %s", state)
log.debug("\tprogrammerCtrl = %s", sysfs_interface.read_programmer_ctrl())
dbg.process_programming_messages()
except SystemExit:
pass
stack.close()
sysfs_interface.load_pru_firmware("pru0-shepherd-EMU")
sysfs_interface.load_pru_firmware("pru1-shepherd")
return failed # TODO: all run_() should emit error and abort_on_error should decide
The snippet is taken from the actual implementation in sheep/init and references the ProgrammingTask.
Example-Code
This snippet shows the harvester and emulator instantiated with custom config-models. It was used as a 10h stress-test to find a memory leak.
from pathlib import Path
from shepherd_core.data_models import VirtualSourceConfig
from shepherd_core.data_models.task import EmulationTask
from shepherd_core.data_models.task import HarvestTask
from shepherd_sheep import log
from shepherd_sheep import run_emulator
from shepherd_sheep import run_harvester
# run on observer with
# sudo python3 /opt/shepherd/software/python-package/tests_manual/testbench_longrun.py
if __name__ == "__main__":
duration = 10 * 60 * 60 # s
benchmark_path = Path("/var/shepherd/recordings")
file_rec = benchmark_path / "benchmark_rec.h5"
file_emu1 = benchmark_path / "benchmark_emu1.h5"
file_emu2 = benchmark_path / "benchmark_emu2.h5"
if not file_rec.exists():
log.info("Start harvesting")
hrv = HarvestTask(
output_path=file_rec,
duration=duration,
force_overwrite=True,
use_cal_default=True,
)
run_harvester(hrv)
log.info("Starting Emulation1, only logging of SysUtil-Stats")
emu1 = EmulationTask(
input_path=file_rec,
output_path=file_emu1,
duration=duration,
force_overwrite=True,
virtual_source=VirtualSourceConfig(name="BQ25570s"),
power_tracing=None,
gpio_tracing=None,
verbose=3,
)
run_emulator(emu1)
log.info("Starting Emulation2, ")
emu2 = EmulationTask(
input_path=file_rec,
output_path=file_emu2,
duration=duration,
force_overwrite=True,
virtual_source=VirtualSourceConfig(name="BQ25570s"),
verbose=3,
)
run_emulator(emu2)