Source code for spinnman.abstract_spinnman_simulation

# Copyright (c) 2025 The University of Manchester
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
import traceback
from typing import Dict, Optional, Tuple, Type


from spinn_utilities.abstract_base import abstractmethod
from spinn_utilities.config_holder import (
    check_user_cfg, clear_cfg_files, get_config_bool, get_config_int,
    get_config_str, get_config_str_or_none, is_config_none, load_config)
from spinn_utilities.configs.no_config_found_exception import (
    NoConfigFoundException)
from spinn_utilities.exceptions import ConfigException
from spinn_utilities.log import FormatAdapter
from spinn_utilities.typing.coords import XY

from spinn_machine import Machine
from spinn_machine.virtual_machine import virtual_machine_generator

from spinnman.data.spinnman_data_writer import SpiNNManDataWriter
from spinnman.exceptions import (
    SpallocBoardUnavailableException, SpinnmanUnsupportedOperationException)
from spinnman.spalloc import is_server_address
from spinnman.spalloc.spalloc_allocator import spalloc_allocate_job
from spinnman.transceiver import Transceiver, transceiver_generator

logger = FormatAdapter(logging.getLogger(__name__))


[docs] class AbstractSpiNNManSimulation(object): """ The SpiNNMan level part of the simulation interface, with abstract config. """ __slots__ = ( # The writer and therefore view of the global data # access this via the property _data_writer to get the type correct "_untyped_data_writer", ) def __init__(self, n_boards_required: Optional[int] = None, n_chips_required: Optional[int] = None) -> None: """ :param n_boards_required: `None` or the number of boards requested by the user :param n_chips_required: `None` or the number of chips requested by the user """ clear_cfg_files(False) self._add_cfg_defaults_and_template() load_config(self._user_cfg_file) self._untyped_data_writer = self._data_writer_cls.setup() self._data_writer.set_n_required( n_boards_required, n_chips_required) @property def _data_writer(self) -> SpiNNManDataWriter: return self._untyped_data_writer @abstractmethod def _add_cfg_defaults_and_template(self) -> None: """ Adds all the default cfg file and a template for the user one. Should not add the user one named in user_cfg_file. Should add a default named the same as user_cfg_file is applicable. """ @property @abstractmethod def _user_cfg_file(self) -> str: """ Name of the user cfg. Includes the one in home which starts with a dot as well as the one in the current directory. """ raise NotImplementedError("user_cfg_file not implemented") @property @abstractmethod def _data_writer_cls(self) -> Type[SpiNNManDataWriter]: """ Type to use for the data writer. Must be SpiNNManDataWriter or a sub class of that """ raise NotImplementedError("data_writer_cls is not implemented")
[docs] def get_machine(self) -> Machine: """ Get the Machine. Creating it if necessary. :returns: The Machine now stored in the DataView """ return self._get_known_machine()
def _get_transceiver( self, total_run_time: Optional[float] = 0.0, ensure_board_is_ready: bool = True) -> Transceiver: """ Creates a Transceiver :param total_run_time: The total run time to request :param ensure_board_is_ready: Make sure the Transceiver is ready to provide Machine details :return: The Transceiver now stored in the DataView """ if self._data_writer.has_transceiver(): transceiver = self._data_writer.get_transceiver() transceiver.ensure_board_is_ready() elif get_config_bool("Machine", "virtual_board"): raise SpinnmanUnsupportedOperationException( "get_transceiver with cfg virtual_board") elif not is_config_none("Machine", "machine_name"): transceiver = self._execute_tranceiver_by_name( ensure_board_is_ready) else: transceiver = self._do_transceiver_by_remote( total_run_time, ensure_board_is_ready=ensure_board_is_ready) return transceiver def _get_known_machine( self, total_run_time: Optional[float] = 0.0) -> Machine: """ Gets and if needed creates a Machine :param total_run_time: The total run time to request :returns: The Machine """ if self._data_writer.has_machine(): return self._data_writer.get_machine() if get_config_bool("Machine", "virtual_board"): return self._execute_get_virtual_machine() return self._do_machine_by_transciever(total_run_time) def _do_machine_by_transciever( self, total_run_time: Optional[float] = 0.0, retry: int = 0) -> Machine: """ Gets and if needed creates a Transceiver and then Machine :param total_run_time: The total run time to request :param retry: The number of retries attempted :returns: The Machine """ got_transciever = False try: transceiver = self._get_transceiver(total_run_time) got_transciever = True machine = transceiver.get_machine_details() self._data_writer.set_machine(machine) return machine except NoConfigFoundException: # No need to retry here raise except Exception as ex: # pylint: disable=broad-except if "Failed to resolve" in str(ex): # Likely a github actions DNS server error raise SpallocBoardUnavailableException( "Suspected DNS error") from ex max_retry = get_config_int("Machine", "spalloc_retry") if retry >= max_retry: logger.exception( "\n*****************************************************") logger.exception(f"Error on machine_generation {retry=}") logger.exception(ex) path = self._data_writer.get_error_file() with open(path, "a", encoding="utf-8") as f: f.write(f"Error on machine_generation {retry=}\n") f.write(traceback.format_exc()) if got_transciever: logger.exception(f"{transceiver=}") f.write(f"{transceiver=}") max_retry = get_config_int("Machine", "spalloc_retry") if retry >= max_retry: raise logger.info(f"Retrying machine_by_transciever {retry=}") self._data_writer.clear_machine() return self._do_machine_by_transciever(total_run_time, retry=retry + 1) def _execute_get_virtual_machine(self) -> Machine: """ Runs VirtualMachineGenerator Will set then "machine" and IP address values. """ machine = virtual_machine_generator() self._data_writer.set_machine(machine) self._data_writer.set_ipaddress("virtual") return machine def _execute_tranceiver_by_name( self, ensure_board_is_ready: bool = True) -> Transceiver: """ Runs getting the Transceiver using machine_name. Will create and set "transceiver" to the View :param ensure_board_is_ready: Flag to say if ensure_board_is_ready should be run :returns: The Transceiver :raises ConfigException: if machine_name is not set in the cfg """ machine_name = get_config_str("Machine", "machine_name") self._data_writer.set_ipaddress(machine_name) bmp_details = get_config_str_or_none("Machine", "bmp_names") auto_detect_bmp = get_config_bool("Machine", "auto_detect_bmp") reset_machine = get_config_bool( "Machine", "reset_machine_on_startup") transceiver = transceiver_generator( bmp_details, auto_detect_bmp, None, reset_machine, ensure_board_is_ready=ensure_board_is_ready) self._data_writer.set_transceiver(transceiver) return transceiver def _do_transceiver_by_remote( self, total_run_time: Optional[float], ensure_board_is_ready: bool) -> Transceiver: _ = total_run_time spalloc_server = get_config_str_or_none("Machine", "spalloc_server") if spalloc_server is None: check_user_cfg() raise ConfigException("cfg missing Machine values") if is_server_address(spalloc_server): transceiver, _ = self._execute_transceiver_by_spalloc( ensure_board_is_ready) return transceiver else: raise SpinnmanUnsupportedOperationException( "Only new spalloc support at the SpiNNMan level") def _execute_transceiver_by_spalloc( self, ensure_board_is_ready: bool ) -> Tuple[Transceiver, Dict[XY, str]]: """ :return: Transceiver and connections (to write to provenance) """ ipaddress, connections, controller = spalloc_allocate_job() self._data_writer.set_ipaddress(ipaddress) self._data_writer.set_allocation_controller(controller) if controller.can_create_transceiver(): transceiver = controller.create_transceiver(ensure_board_is_ready) else: transceiver = transceiver_generator( bmp_details=None, auto_detect_bmp=False, scamp_connection_data=connections, reset_machine_on_start_up=False, ensure_board_is_ready=ensure_board_is_ready) self._data_writer.set_transceiver(transceiver) return (transceiver, connections) def _close_allocation_controller(self) -> None: if self._data_writer.has_allocation_controller(): self._data_writer.get_allocation_controller().close() self._data_writer.set_allocation_controller(None) def _shutdown(self) -> None: # stop the transceiver and allocation controller if self._data_writer.has_transceiver(): transceiver = self._data_writer.get_transceiver() transceiver.stop_application(self._data_writer.get_app_id()) transceiver.close() self._close_allocation_controller() self._data_writer.shut_down()