# Copyright (c) 2016 The University of Manchester
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from contextlib import ExitStack
import logging
import os
import re
from typing import cast, ContextManager, Dict, Tuple, Optional, Union
import ebrains_drive # type: ignore[import]
import requests
from spinn_utilities.config_holder import (
get_config_bool, get_config_str_or_none)
from spinn_utilities.log import FormatAdapter
from spinn_utilities.overrides import overrides
from spinn_utilities.typing.coords import XY
from spinn_utilities.config_holder import get_config_str
from spinnman.connections.udp_packet_connections import SCAMPConnection
from spinnman.constants import SCP_SCAMP_PORT
from spinnman.spalloc import (
MachineAllocationController,
SpallocClient, SpallocJob, SpallocState)
from spinnman.transceiver import Transceiver
logger = FormatAdapter(logging.getLogger(__name__))
SHARED_PATH = re.compile(r".*\/shared\/([^\/]+)")
SHARED_GROUP = 1
SHARED_WITH_PATH = re.compile(r".*\/Shared with (all|groups|me)\/([^\/]+)")
SHARED_WITH_GROUP = 2
[docs]
class SpallocJobController(MachineAllocationController):
"""
A class to Create and support Transceivers specific for Spalloc.
"""
__slots__ = (
# the spalloc job object
"_job",
# the current job's old state
"_state",
"__client",
"__use_proxy"
)
def __init__(
self, client: SpallocClient, job: SpallocJob, use_proxy: bool):
"""
:param client:
:param job:
:param use_proxy:
"""
if job is None:
raise TypeError("must have a real job")
self.__client = client
self._job = job
self._state = job.get_state()
self.__use_proxy = use_proxy
super().__init__("SpallocJobController")
@property
def job(self) -> SpallocJob:
"""
The job value passed into the init.
"""
return self._job
[docs]
@overrides(MachineAllocationController.extend_allocation)
def extend_allocation(self, new_total_run_time: float) -> None:
# Does Nothing in this allocator - machines are held until exit
pass
def __stop(self) -> None:
self._job.destroy()
self.__client.close()
[docs]
@overrides(MachineAllocationController.close)
def close(self) -> None:
super().close()
self.__stop()
[docs]
@overrides(MachineAllocationController.where_is_machine)
def where_is_machine(
self, chip_x: int, chip_y: int) -> Tuple[int, int, int]:
result = self._job.where_is_machine(x=chip_x, y=chip_y)
if result is None:
raise ValueError("coordinates lie outside machine")
return result
@overrides(MachineAllocationController._wait)
def _wait(self) -> bool:
try:
if self._state != SpallocState.DESTROYED:
self._state = self._job.wait_for_state_change(self._state)
except TypeError:
pass
except Exception as e: # pylint: disable=broad-except
if not self._exited:
raise e
return self._state != SpallocState.DESTROYED
@overrides(MachineAllocationController._teardown)
def _teardown(self) -> None:
if not self._exited:
self.__stop()
super()._teardown()
[docs]
def create_transceiver(self, ensure_board_is_ready: bool) -> Transceiver:
"""
Create a Transceiver using proxy
.. note::
This allocation controller proxies the transceiver's connections
via Spalloc. This allows it to work even outside the UNIMAN
firewall.
:param ensure_board_is_ready:
Flag to say if ensure_board_is_ready should be run
:returns: A proxied Transceiver
"""
if self.__use_proxy:
return self._job.create_transceiver(ensure_board_is_ready)
raise NotImplementedError(
"create transceiver only supported if using proxy")
[docs]
def can_create_transceiver(self) -> bool:
"""
:returns: True if create_transceiver would work
"""
return self.__use_proxy
[docs]
def open_sdp_connection(
self, chip_x: int, chip_y: int,
udp_port: int = SCP_SCAMP_PORT) -> Optional[SCAMPConnection]:
"""
Open a connection to a specific Ethernet-enabled SpiNNaker chip.
Caller will have to arrange for SpiNNaker to pay attention to the
connection.
The coordinates will be job-relative.
.. note::
This allocation controller proxies connections via Spalloc. This
allows it to work even outside the UNIMAN firewall.
:param chip_x: Ethernet-enabled chip X coordinate
:param chip_y: Ethernet-enabled chip Y coordinate
:param udp_port:
the UDP port on the chip to connect to; connecting to a non-SCP
port will result in a connection that can't easily be configured.
:returns:
Connection to the Chip with a know host over this port or None
"""
if self.__use_proxy:
return self._job.connect_to_board(chip_x, chip_y, udp_port)
else:
return None
@property
@overrides(MachineAllocationController.proxying)
def proxying(self) -> bool:
return self.__use_proxy
def __str__(self) -> str:
return f"SpallocJobController over {self._job}"
def __bearer_token() -> Optional[str]:
"""
:return: The OIDC bearer token
"""
# Try using Jupyter if we have the right variables
jupyter_token = os.getenv("JUPYTERHUB_API_TOKEN")
jupyter_ip = os.getenv("JUPYTERHUB_SERVICE_HOST")
jupyter_port = os.getenv("JUPYTERHUB_SERVICE_PORT")
if (jupyter_token is not None and jupyter_ip is not None and
jupyter_port is not None):
jupyter_url = (f"http://{jupyter_ip}:{jupyter_port}/services/"
"access-token-service/access-token")
headers = {"Authorization": f"Token {jupyter_token}"}
response = requests.get(jupyter_url, headers=headers, timeout=10)
return response.json().get('access_token')
# Try a simple environment variable, or None if that doesn't exist
return os.getenv("OIDC_BEARER_TOKEN")
def __get_collab_id_from_folder(folder: str) -> Optional[Dict[str, str]]:
"""
Currently hacky way to get the EBRAINS collab id from the
drive folder, replicated from the NMPI collab template.
"""
token = __bearer_token
if token is None:
return None
ebrains_drive_client = ebrains_drive.connect(token=token)
repo_by_title = ebrains_drive_client.repos.get_repos_by_name(folder)
if len(repo_by_title) != 1:
logger.warning(f"The repository for collab {folder} could not be"
" found; continuing as if not in a collaboratory")
return {}
# Owner is formatted as collab-<collab_id>-<permission>, and we want
# to extract the <collab-id>
owner = repo_by_title[0].owner
collab_id = owner[:owner.rindex("-")]
collab_id = collab_id[collab_id.find("-") + 1:]
logger.info(f"Requesting job in collaboratory {collab_id}")
return {"collab": collab_id}
def __group_collab_or_job() -> Dict[str, str]:
"""
:return: The group, collab, or NMPI Job ID to associate with jobs
"""
# Try to get a NMPI Job
nmpi_job = os.getenv("NMPI_JOB_ID")
if nmpi_job is not None and nmpi_job != "":
nmpi_user = os.getenv("NMPI_USER")
if nmpi_user is not None and nmpi_user != "":
logger.info("Requesting job for NMPI job {}, user {}",
nmpi_job, nmpi_user)
return {"nmpi_job": nmpi_job, "nmpi_user": nmpi_user}
logger.info("Requesting spalloc job for NMPI job {}", nmpi_job)
return {"nmpi_job": nmpi_job}
# Try to get the collab from the path
cwd = os.getcwd()
match_obj = SHARED_PATH.match(cwd)
if match_obj:
collab = __get_collab_id_from_folder(
match_obj.group(SHARED_GROUP))
if collab is not None:
return collab
match_obj = SHARED_WITH_PATH.match(cwd)
if match_obj:
collab = __get_collab_id_from_folder(
match_obj.group(SHARED_WITH_GROUP))
if collab is not None:
return collab
# Try to use the config to get a group
group = get_config_str_or_none("Machine", "spalloc_group")
if group is not None:
return {"group": group}
# Nothing ventured, nothing gained
return {}
[docs]
def spalloc_allocate_job() -> Tuple[
str, Dict[XY, str], SpallocJobController]:
"""
Request a machine from an new-style spalloc server that will fit the
given number of boards.
:return: host, board address map, allocation controller
"""
return __spalloc_allocate_job(
__bearer_token(), **__group_collab_or_job())
def __spalloc_allocate_job(
bearer_token: Optional[str] = None, group: Optional[str] = None,
collab: Optional[str] = None, nmpi_job: Union[int, str, None] = None,
nmpi_user: Optional[str] = None) -> Tuple[
str, Dict[XY, str], SpallocJobController]:
"""
Request a machine from an new-style spalloc server that will fit the
given number of boards.
:param bearer_token: The bearer token to use
:param group: The group to associate with or None for no group
:param collab: The collab to associate with or None for no collab
:param nmpi_job: The NMPI Job to associate with or None for no job
:param nmpi_user: The NMPI username to associate with or None for no user
:return: host, board address map, allocation controller
"""
spalloc_server = get_config_str("Machine", "spalloc_server")
with ExitStack() as stack:
use_proxy = get_config_bool("Machine", "spalloc_use_proxy")
if nmpi_job is None:
_nmpi_job: Optional[int] = None
else:
_nmpi_job = int(nmpi_job)
client = SpallocClient(
spalloc_server, bearer_token=bearer_token, group=group,
collab=collab, nmpi_job=_nmpi_job, nmpi_user=nmpi_user)
stack.enter_context(cast(ContextManager[SpallocClient], client))
job = client.create_job()
stack.enter_context(job)
job.wait_until_ready()
connections = job.get_connections()
root = connections.get((0, 0), None)
if logger.isEnabledFor(logging.DEBUG):
logger.debug(
"boards: {}",
str(connections).replace("{", "[").replace("}", "]"))
allocation_controller = SpallocJobController(
client, job, use_proxy or False)
# Success! We don't want to close the client, job or task now;
# the allocation controller now owns them.
stack.pop_all()
assert root is not None, "no root of ready board"
return (root, connections, allocation_controller)