# Copyright (c) 2025 The University of Manchester
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import traceback
from typing import Dict, Optional, Tuple, Type
from spinn_utilities.abstract_base import abstractmethod
from spinn_utilities.config_holder import (
check_user_cfg, clear_cfg_files, get_config_bool, get_config_int,
get_config_str, get_config_str_or_none, is_config_none, load_config)
from spinn_utilities.configs.no_config_found_exception import (
NoConfigFoundException)
from spinn_utilities.exceptions import ConfigException
from spinn_utilities.log import FormatAdapter
from spinn_utilities.typing.coords import XY
from spinn_machine import Machine
from spinn_machine.virtual_machine import virtual_machine_generator
from spinnman.data.spinnman_data_writer import SpiNNManDataWriter
from spinnman.exceptions import (
SpallocBoardUnavailableException, SpinnmanUnsupportedOperationException)
from spinnman.spalloc import is_server_address
from spinnman.spalloc.spalloc_allocator import spalloc_allocate_job
from spinnman.transceiver import Transceiver, transceiver_generator
logger = FormatAdapter(logging.getLogger(__name__))
[docs]
class AbstractSpiNNManSimulation(object):
"""
The SpiNNMan level part of the simulation interface, with abstract config.
"""
__slots__ = (
# The writer and therefore view of the global data
# access this via the property _data_writer to get the type correct
"_untyped_data_writer", )
def __init__(self, n_boards_required: Optional[int] = None,
n_chips_required: Optional[int] = None) -> None:
"""
:param n_boards_required:
`None` or the number of boards requested by the user
:param n_chips_required:
`None` or the number of chips requested by the user
"""
clear_cfg_files(False)
self._add_cfg_defaults_and_template()
load_config(self._user_cfg_file)
self._untyped_data_writer = self._data_writer_cls.setup()
self._data_writer.set_n_required(
n_boards_required, n_chips_required)
@property
def _data_writer(self) -> SpiNNManDataWriter:
return self._untyped_data_writer
@abstractmethod
def _add_cfg_defaults_and_template(self) -> None:
"""
Adds all the default cfg file and a template for the user one.
Should not add the user one named in user_cfg_file.
Should add a default named the same as user_cfg_file is applicable.
"""
@property
@abstractmethod
def _user_cfg_file(self) -> str:
"""
Name of the user cfg.
Includes the one in home which starts with a dot
as well as the one in the current directory.
"""
raise NotImplementedError("user_cfg_file not implemented")
@property
@abstractmethod
def _data_writer_cls(self) -> Type[SpiNNManDataWriter]:
"""
Type to use for the data writer.
Must be SpiNNManDataWriter or a sub class of that
"""
raise NotImplementedError("data_writer_cls is not implemented")
[docs]
def get_machine(self) -> Machine:
"""
Get the Machine. Creating it if necessary.
:returns: The Machine now stored in the DataView
"""
return self._get_known_machine()
def _get_transceiver(
self, total_run_time: Optional[float] = 0.0,
ensure_board_is_ready: bool = True) -> Transceiver:
"""
Creates a Transceiver
:param total_run_time: The total run time to request
:param ensure_board_is_ready:
Make sure the Transceiver is ready to provide Machine details
:return: The Transceiver now stored in the DataView
"""
if self._data_writer.has_transceiver():
transceiver = self._data_writer.get_transceiver()
transceiver.ensure_board_is_ready()
elif get_config_bool("Machine", "virtual_board"):
raise SpinnmanUnsupportedOperationException(
"get_transceiver with cfg virtual_board")
elif not is_config_none("Machine", "machine_name"):
transceiver = self._execute_tranceiver_by_name(
ensure_board_is_ready)
else:
transceiver = self._do_transceiver_by_remote(
total_run_time, ensure_board_is_ready=ensure_board_is_ready)
return transceiver
def _get_known_machine(
self, total_run_time: Optional[float] = 0.0) -> Machine:
"""
Gets and if needed creates a Machine
:param total_run_time: The total run time to request
:returns: The Machine
"""
if self._data_writer.has_machine():
return self._data_writer.get_machine()
if get_config_bool("Machine", "virtual_board"):
return self._execute_get_virtual_machine()
return self._do_machine_by_transciever(total_run_time)
def _do_machine_by_transciever(
self, total_run_time: Optional[float] = 0.0,
retry: int = 0) -> Machine:
"""
Gets and if needed creates a Transceiver and then Machine
:param total_run_time: The total run time to request
:param retry: The number of retries attempted
:returns: The Machine
"""
got_transciever = False
try:
transceiver = self._get_transceiver(total_run_time)
got_transciever = True
machine = transceiver.get_machine_details()
self._data_writer.set_machine(machine)
return machine
except NoConfigFoundException:
# No need to retry here
raise
except Exception as ex: # pylint: disable=broad-except
if "Failed to resolve" in str(ex):
# Likely a github actions DNS server error
raise SpallocBoardUnavailableException(
"Suspected DNS error") from ex
max_retry = get_config_int("Machine", "spalloc_retry")
if retry >= max_retry:
logger.exception(
"\n*****************************************************")
logger.exception(f"Error on machine_generation {retry=}")
logger.exception(ex)
path = self._data_writer.get_error_file()
with open(path, "a", encoding="utf-8") as f:
f.write(f"Error on machine_generation {retry=}\n")
f.write(traceback.format_exc())
if got_transciever:
logger.exception(f"{transceiver=}")
f.write(f"{transceiver=}")
max_retry = get_config_int("Machine", "spalloc_retry")
if retry >= max_retry:
raise
logger.info(f"Retrying machine_by_transciever {retry=}")
self._data_writer.clear_machine()
return self._do_machine_by_transciever(total_run_time, retry=retry + 1)
def _execute_get_virtual_machine(self) -> Machine:
"""
Runs VirtualMachineGenerator
Will set then "machine" and IP address values.
"""
machine = virtual_machine_generator()
self._data_writer.set_machine(machine)
self._data_writer.set_ipaddress("virtual")
return machine
def _execute_tranceiver_by_name(
self, ensure_board_is_ready: bool = True) -> Transceiver:
"""
Runs getting the Transceiver using machine_name.
Will create and set "transceiver" to the View
:param ensure_board_is_ready:
Flag to say if ensure_board_is_ready should be run
:returns: The Transceiver
:raises ConfigException: if machine_name is not set in the cfg
"""
machine_name = get_config_str("Machine", "machine_name")
self._data_writer.set_ipaddress(machine_name)
bmp_details = get_config_str_or_none("Machine", "bmp_names")
auto_detect_bmp = get_config_bool("Machine", "auto_detect_bmp")
reset_machine = get_config_bool(
"Machine", "reset_machine_on_startup")
transceiver = transceiver_generator(
bmp_details, auto_detect_bmp, None, reset_machine,
ensure_board_is_ready=ensure_board_is_ready)
self._data_writer.set_transceiver(transceiver)
return transceiver
def _do_transceiver_by_remote(
self, total_run_time: Optional[float],
ensure_board_is_ready: bool) -> Transceiver:
_ = total_run_time
spalloc_server = get_config_str_or_none("Machine", "spalloc_server")
if spalloc_server is None:
check_user_cfg()
raise ConfigException("cfg missing Machine values")
if is_server_address(spalloc_server):
transceiver, _ = self._execute_transceiver_by_spalloc(
ensure_board_is_ready)
return transceiver
else:
raise SpinnmanUnsupportedOperationException(
"Only new spalloc support at the SpiNNMan level")
def _execute_transceiver_by_spalloc(
self, ensure_board_is_ready: bool
) -> Tuple[Transceiver, Dict[XY, str]]:
"""
:return: Transceiver and connections (to write to provenance)
"""
ipaddress, connections, controller = spalloc_allocate_job()
self._data_writer.set_ipaddress(ipaddress)
self._data_writer.set_allocation_controller(controller)
if controller.can_create_transceiver():
transceiver = controller.create_transceiver(ensure_board_is_ready)
else:
transceiver = transceiver_generator(
bmp_details=None, auto_detect_bmp=False,
scamp_connection_data=connections,
reset_machine_on_start_up=False,
ensure_board_is_ready=ensure_board_is_ready)
self._data_writer.set_transceiver(transceiver)
return (transceiver, connections)
def _close_allocation_controller(self) -> None:
if self._data_writer.has_allocation_controller():
self._data_writer.get_allocation_controller().close()
self._data_writer.set_allocation_controller(None)
def _shutdown(self) -> None:
# stop the transceiver and allocation controller
if self._data_writer.has_transceiver():
transceiver = self._data_writer.get_transceiver()
transceiver.stop_application(self._data_writer.get_app_id())
transceiver.close()
self._close_allocation_controller()
self._data_writer.shut_down()