Shepherd-Sheep#

The shepherd-sheep API offers high level access to shepherd’s functionality and forms the base for the two command line utilities. With the introduction of the core-lib the api was simplified and modernized with a model-based approach. The pydantic data-models offer self-validating config-parameters with neutral defaults.

For lower-level access, have a look at the Shepherd-Sheep - Advanced. There is a third option called debug-api, used i.e. by the programmer. It will not be documented here. To learn about the functionality the source should be consulted.

Harvesting#

The run_harvester()-function can be used to configure all relevant hardware and software and to sample and extract data from the analog frontend.

from contextlib import ExitStack

from shepherd_core.data_models.task import HarvestTask
from shepherd_sheep.shepherd_harvester import ShepherdHarvester
from shepherd_sheep.logger import set_verbosity
def run_harvester(cfg: HarvestTask) -> bool:
    stack = ExitStack()
    set_verbosity(cfg.verbose, temporary=True)
    failed = False
    try:
        hrv = ShepherdHarvester(cfg=cfg)
        stack.enter_context(hrv)
        hrv.run()
    except SystemExit:
        failed = True
    stack.close()
    return failed

The snippet is taken from the actual implementation in sheep/init and references the HarvestTask

Emulating#

The run_emulator()-function can be used to emulate previously recorded IV data for an attached sensor node.

from contextlib import ExitStack

from shepherd_core.data_models.task import EmulationTask
from shepherd_sheep.shepherd_emulator import ShepherdEmulator
from shepherd_sheep.logger import set_verbosity
def run_emulator(cfg: EmulationTask) -> bool:
    stack = ExitStack()
    set_verbosity(cfg.verbose, temporary=True)
    failed = False
    try:
        emu = ShepherdEmulator(cfg=cfg)
        stack.enter_context(emu)
        emu.run()
    except SystemExit:
        failed = True
        pass
    stack.close()
    return failed

The snippet is taken from the actual implementation in sheep/init and references the EmulationTask.

Note

TODO: add user/task-config and relink both tasks above

Modify Firmware#

The run_firmware_mod()-function can be used to customize a firmware before flashing it.

import shutil

from shepherd_core.data_models import FirmwareDType
from shepherd_core.data_models.task import FirmwareModTask
from shepherd_core.fw_tools import extract_firmware
from shepherd_core.fw_tools import firmware_to_hex
from shepherd_core.fw_tools import modify_uid
from shepherd_sheep.logger import set_verbosity
from shepherd_sheep.sysfs_interface import check_sys_access
def run_firmware_mod(cfg: FirmwareModTask) -> bool:
    set_verbosity(cfg.verbose, temporary=True)
    check_sys_access()  # not really needed here
    file_path = extract_firmware(cfg.data, cfg.data_type, cfg.firmware_file)
    if cfg.data_type in {FirmwareDType.path_elf, FirmwareDType.base64_elf}:
        modify_uid(file_path, cfg.custom_id)
        file_path = firmware_to_hex(file_path)
    if file_path.as_posix() != cfg.firmware_file.as_posix():
        shutil.move(file_path, cfg.firmware_file)
    return False

The snippet is taken from the actual implementation in sheep/init and references the FirmwareModTask.

Program Target#

The run_programmer()-function can flash a .hex-file to a target of choice.

from contextlib import ExitStack

from shepherd_core.data_models.task import ProgrammingTask
from shepherd_sheep import sysfs_interface
from shepherd_sheep.logger import set_verbosity
from shepherd_sheep.shepherd_debug import ShepherdDebug
# Note: probably some includes missing
def run_programmer(cfg: ProgrammingTask) -> bool:
    stack = ExitStack()
    set_verbosity(cfg.verbose, temporary=True)
    failed = False

    try:
        dbg = ShepherdDebug(use_io=False)  # TODO: this could all go into ShepherdDebug
        stack.enter_context(dbg)

        dbg.select_port_for_power_tracking(
            not dbg.convert_target_port_to_bool(cfg.target_port),
        )
        dbg.set_power_state_emulator(True)
        dbg.select_port_for_io_interface(cfg.target_port)
        dbg.set_io_level_converter(True)

        sysfs_interface.write_dac_aux_voltage(cfg.voltage)
        # switching target may restart pru
        sysfs_interface.wait_for_state("idle", 5)

        sysfs_interface.load_pru_firmware(cfg.protocol)
        dbg.refresh_shared_mem()  # address might have changed

        log.info("processing file %s", cfg.firmware_file.name)
        d_type = suffix_to_DType.get(cfg.firmware_file.suffix.lower())
        if d_type != FirmwareDType.base64_hex:
            log.warning("Firmware seems not to be HEX - but will try to program anyway")

        # WORKAROUND that realigns hex for misguided programmer
        path_str = cfg.firmware_file.as_posix()
        path_tmp = tempfile.TemporaryDirectory()
        stack.enter_context(path_tmp)
        file_tmp = Path(path_tmp.name) / "aligned.hex"
        # tmp_path because firmware can be in readonly content-dir
        cmd = [
            "/usr/bin/srec_cat",
            # BL51 hex files are not sorted for ascending addresses. Suppress this warning
            "-disable-sequence-warning",
            # load input HEX file
            path_str,
            "-Intel",
            # fill all incomplete 16-bit words with 0xFF. The range is limited to the application
            "-fill",
            "0xFF",
            "-within",
            path_str,
            "-Intel",
            "-range-padding",
            "4",
            # generate hex records with 16 byte data length (default 32 byte)
            "-Output_Block_Size=16",
            # generate 16-bit address records. Do no use for address ranges > 64K
            "-address-length=2",
            # generate a Intel hex file
            "-o",
            file_tmp.as_posix(),
            "-Intel",
        ]
        ret = subprocess.run(cmd, timeout=30, check=False)  # noqa: S603
        if ret.returncode > 0:
            log.error("Error during realignment (srec_cat): %s", ret.stderr)
            failed = True
            raise SystemExit

        with file_tmp.resolve().open("rb") as fw:
            try:
                dbg.shared_mem.write_firmware(fw.read())
                target = cfg.mcu_type.lower()
                if "msp430" in target:
                    target = "msp430"
                elif "nrf52" in target:
                    target = "nrf52"
                else:
                    log.warning(
                        "MCU-Type needs to be [msp430, nrf52] but was: %s",
                        target,
                    )
                if cfg.simulate:
                    target = "dummy"
                if cfg.mcu_port == 1:
                    sysfs_interface.write_programmer_ctrl(
                        target,
                        cfg.datarate,
                        5,
                        4,
                        10,
                    )
                else:
                    sysfs_interface.write_programmer_ctrl(
                        target,
                        cfg.datarate,
                        8,
                        9,
                        11,
                    )
                log.info("Programmer initialized, will start now")
                sysfs_interface.start_programmer()
            except OSError:
                log.error("OSError - Failed to initialize Programmer")
                failed = True
            except ValueError as xpt:
                log.exception("ValueError: %s", str(xpt))
                failed = True

        state = "init"
        while state != "idle" and not failed:
            log.info(
                "Programming in progress,\tpgm_state = %s, shp_state = %s",
                state,
                sysfs_interface.get_state(),
            )
            time.sleep(1)
            state = sysfs_interface.check_programmer()
            if "error" in state:
                log.error(
                    "SystemError - Failed during Programming, p_state = %s",
                    state,
                )
                failed = True
        if failed:
            log.info("Programming - Procedure failed - will exit now!")
        else:
            log.info("Finished Programming!")
        log.debug("\tshepherdState   = %s", sysfs_interface.get_state())
        log.debug("\tprogrammerState = %s", state)
        log.debug("\tprogrammerCtrl  = %s", sysfs_interface.read_programmer_ctrl())
        dbg.process_programming_messages()
    except SystemExit:
        pass
    stack.close()

    sysfs_interface.load_pru_firmware("shepherd")
    return failed  # TODO: all run_() should emit error and abort_on_error should decide

The snippet is taken from the actual implementation in sheep/init and references the ProgrammingTask.

Example-Code#

This snippet shows the harvester and emulator instantiated with custom config-models. It was used as a 10h stress-test to find a memory leak.

from pathlib import Path

from shepherd_core.data_models import VirtualSourceConfig
from shepherd_core.data_models.task import EmulationTask
from shepherd_core.data_models.task import HarvestTask
from shepherd_sheep import log
from shepherd_sheep import run_emulator
from shepherd_sheep import run_harvester

# run on observer with
# sudo python3 /opt/shepherd/software/python-package/tests_manual/testbench_longrun.py

if __name__ == "__main__":
    duration = 10 * 60 * 60  # s
    benchmark_path = Path("/var/shepherd/recordings")
    file_rec = benchmark_path / "benchmark_rec.h5"
    file_emu1 = benchmark_path / "benchmark_emu1.h5"
    file_emu2 = benchmark_path / "benchmark_emu2.h5"

    if not file_rec.exists():
        log.info("Start harvesting")
        hrv = HarvestTask(
            output_path=file_rec,
            duration=duration,
            force_overwrite=True,
            use_cal_default=True,
        )
        run_harvester(hrv)

    log.info("Starting Emulation1, only logging of SysUtil-Stats")
    emu1 = EmulationTask(
        input_path=file_rec,
        output_path=file_emu1,
        duration=duration,
        force_overwrite=True,
        virtual_source=VirtualSourceConfig(name="BQ25570s"),
        power_tracing=None,
        gpio_tracing=None,
        verbose=3,
    )
    run_emulator(emu1)

    log.info("Starting Emulation2, ")
    emu2 = EmulationTask(
        input_path=file_rec,
        output_path=file_emu2,
        duration=duration,
        force_overwrite=True,
        virtual_source=VirtualSourceConfig(name="BQ25570s"),
        verbose=3,
    )
    run_emulator(emu2)

Source: ./tests_manual/testbench_longrun.py