# Copyright (c) 2021 The University of Manchester
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Implementation of the client for the Spalloc web service.
"""
from contextlib import contextmanager
from logging import getLogger
from multiprocessing import Process, Queue
import queue
import struct
import threading
from time import sleep
from typing import (Any, ContextManager, Callable, Dict, FrozenSet, Iterable,
Iterator, List, Mapping, Optional, Tuple, cast)
from urllib.parse import urlparse, urlunparse, ParseResult
from packaging.version import Version
import requests
from typing_extensions import TypeAlias
from websocket import WebSocket # type: ignore
from spinn_utilities.abstract_base import AbstractBase, abstractmethod
from spinn_utilities.abstract_context_manager import AbstractContextManager
from spinn_utilities.log import FormatAdapter
from spinn_utilities.typing.coords import XY
from spinn_utilities.typing.json import JsonObject, JsonValue
from spinn_utilities.overrides import overrides
from spinnman.connections.udp_packet_connections import UDPConnection
from spinnman.connections.abstract_classes import Connection, Listenable
from spinnman.constants import SCP_SCAMP_PORT, UDP_BOOT_CONNECTION_DEFAULT_PORT
from spinnman.exceptions import SpinnmanTimeoutException
from spinnman.exceptions import SpallocException
from spinnman.transceiver import (
Transceiver, create_transceiver_from_connections)
from .abstract_spalloc_client import AbstractSpallocClient
from .proxy_protocol import ProxyProtocol
from .session import Session, SessionAware
from .spalloc_boot_connection import SpallocBootConnection
from .spalloc_eieio_connection import SpallocEIEIOConnection
from .spalloc_eieio_listener import SpallocEIEIOListener
from .spalloc_job import SpallocJob
from .spalloc_machine import SpallocMachine
from .spalloc_proxied_connection import SpallocProxiedConnection
from .spalloc_scp_connection import SpallocSCPConnection
from .spalloc_state import SpallocState
from .utils import parse_service_url, get_hostname
logger = FormatAdapter(getLogger(__name__))
_open_req = struct.Struct("<IIIII")
_close_req = struct.Struct("<III")
_open_listen_req = struct.Struct("<II")
# Open and close share the response structure
_open_close_res = struct.Struct("<III")
_open_listen_res = struct.Struct("<IIIBBBBI")
_msg = struct.Struct("<II")
_msg_to = struct.Struct("<IIIII")
def fix_url(url: Any) -> str:
"""
Makes sure the url is the correct format.
:param str url: original url
:rtype: str
"""
parts = urlparse(url)
if parts.scheme != 'https':
parts = ParseResult("https", parts.netloc, parts.path,
parts.params, parts. query, parts.fragment)
if not parts.path.endswith("/"):
parts = ParseResult(parts.scheme, parts.netloc, parts.path + "/",
parts.params, parts.query, parts.fragment)
return urlunparse(parts)
class SpallocClient(AbstractContextManager, AbstractSpallocClient):
"""
Basic client library for talking to new Spalloc.
"""
__slots__ = ("__session",
"__machines_url", "__jobs_url", "version",
"__group", "__collab", "__nmpi_job", "__nmpi_user")
def __init__(
self, service_url: str,
username: Optional[str] = None, password: Optional[str] = None,
bearer_token: Optional[str] = None,
group: Optional[str] = None, collab: Optional[str] = None,
nmpi_job: Optional[int] = None, nmpi_user: Optional[str] = None):
"""
:param str service_url: The reference to the service.
May have username and password supplied as part of the network
location; if so, the ``username`` and ``password`` arguments
*must* be ``None``. If ``username`` and ``password`` are not given,
not even within the URL, the ``bearer_token`` must be not ``None``.
:param str username: The user name to use
:param str password: The password to use
:param str bearer_token: The bearer token to use
"""
if username is None and password is None:
service_url, username, password = parse_service_url(service_url)
self.__session: Optional[Session] = Session(
service_url, username, password, bearer_token)
obj = self.__session.renew()
v = cast(JsonObject, obj["version"])
self.version = Version(
f"{v['major-version']}.{v['minor-version']}.{v['revision']}")
self.__machines_url = fix_url(obj["machines-ref"])
self.__jobs_url = fix_url(obj["jobs-ref"])
self.__group = group
self.__collab = collab
self.__nmpi_job = nmpi_job
self.__nmpi_user = nmpi_user
logger.info("established session to {} for {}", service_url, username)
[docs]
@staticmethod
def open_job_from_database(
service_url, job_url, cookies, headers) -> SpallocJob:
"""
Create a job from the description in the attached database. This is
intended to allow for access to the job's allocated resources from
visualisers and other third party code participating in the SpiNNaker
Tools Notification Protocol.
.. note::
The job is not verified to exist and be running. The session
credentials may have expired; if so, the job will be unable to
regenerate them.
:param str service_url:
:param str job_url:
:param dict(str, str) cookies:
:param dict(str, str) headers:
:return:
The job handle, or ``None`` if the records in the database are
absent or incomplete.
:rtype: SpallocJob
"""
session = Session(service_url, session_credentials=(cookies, headers))
return _SpallocJob(session, job_url)
[docs]
@overrides(AbstractSpallocClient.list_machines)
def list_machines(self) -> Dict[str, SpallocMachine]:
assert self.__session
obj = self.__session.get(self.__machines_url).json()
return {m["name"]: _SpallocMachine(self.__session, m)
for m in obj["machines"]}
[docs]
@overrides(AbstractSpallocClient.list_jobs)
def list_jobs(self, deleted: bool = False) -> Iterable[SpallocJob]:
assert self.__session
obj = self.__session.get(
self.__jobs_url,
deleted=("true" if deleted else "false")).json()
while obj["jobs"]:
for u in obj["jobs"]:
yield _SpallocJob(self.__session, fix_url(u))
if "next" not in obj:
break
obj = self.__session.get(obj["next"]).json()
def _create(self, create: Mapping[str, JsonValue],
machine_name: Optional[str]) -> SpallocJob:
assert self.__session
operation = dict(create)
if machine_name:
operation["machine-name"] = machine_name
else:
operation["tags"] = ["default"]
if self.__group is not None:
operation["group"] = self.__group
if self.__collab is not None:
operation["nmpi-collab"] = self.__collab
if self.__nmpi_job is not None:
operation["nmpi-job-id"] = self.__nmpi_job
if self.__nmpi_user is not None:
operation["owner"] = self.__nmpi_user
logger.info("Posting {} to {}", operation, self.__jobs_url)
r = self.__session.post(self.__jobs_url, operation, timeout=30)
url = r.headers["Location"]
return _SpallocJob(self.__session, fix_url(url))
[docs]
@overrides(AbstractSpallocClient.create_job)
def create_job(
self, num_boards: int = 1,
machine_name: Optional[str] = None,
keepalive: int = 45) -> SpallocJob:
return self._create({
"num-boards": int(num_boards),
"keepalive-interval": f"PT{int(keepalive)}S"
}, machine_name)
[docs]
@overrides(AbstractSpallocClient.create_job_rect)
def create_job_rect(
self, width: int, height: int,
machine_name: Optional[str] = None,
keepalive: int = 45) -> SpallocJob:
return self._create({
"dimensions": {
"width": int(width),
"height": int(height)
},
"keepalive-interval": f"PT{int(keepalive)}S"
}, machine_name)
[docs]
@overrides(AbstractSpallocClient.create_job_board)
def create_job_board(
self, triad: Optional[Tuple[int, int, int]] = None,
physical: Optional[Tuple[int, int, int]] = None,
ip_address: Optional[str] = None,
machine_name: Optional[str] = None,
keepalive: int = 45) -> SpallocJob:
board: JsonObject
if triad:
x, y, z = triad
board = {"x": int(x), "y": int(y), "z": int(z)}
elif physical:
c, f, b = physical
board = {"cabinet": int(c), "frame": int(f), "board": int(b)}
elif ip_address:
board = {"address": str(ip_address)}
else:
raise KeyError("at least one of triad, physical and ip_address "
"must be given")
return self._create({
"board": board,
"keepalive-interval": f"PT{int(keepalive)}S"
}, machine_name)
[docs]
@overrides(AbstractSpallocClient.create_job_rect_at_board)
def create_job_rect_at_board(
self, width: int, height: int,
triad: Optional[Tuple[int, int, int]] = None,
physical: Optional[Tuple[int, int, int]] = None,
ip_address: Optional[str] = None,
machine_name: Optional[str] = None, keepalive: int = 45,
max_dead_boards: int = 0) -> SpallocJob:
board: JsonObject
if triad:
x, y, z = triad
board = {"x": int(x), "y": int(y), "z": int(z)}
elif physical:
c, f, b = physical
board = {"cabinet": int(c), "frame": int(f), "board": int(b)}
elif ip_address:
board = {"address": str(ip_address)}
else:
raise KeyError("at least one of triad, physical and ip_address "
"must be given")
return self._create({
"dimensions": {
"width": int(width),
"height": int(height)
},
"board": board,
"keepalive-interval": f"PT{int(keepalive)}S",
"max-dead-boards": int(max_dead_boards)
}, machine_name)
[docs]
def close(self) -> None:
# pylint: disable=protected-access
if self.__session is not None:
self.__session._purge()
self.__session = None
class _ProxyServiceError(IOError):
"""
An error passed to us from the server over the proxy channel.
"""
def _spalloc_keepalive(url, interval, term_queue, cookies, headers):
"""
Actual keepalive task implementation. Don't use directly.
"""
headers["Content-Type"] = "text/plain; charset=UTF-8"
while True:
requests.put(url, data="alive", cookies=cookies, headers=headers,
allow_redirects=False, timeout=10)
try:
term_queue.get(True, interval)
break
except queue.Empty:
continue
# On ValueError or OSError, just terminate the keepalive process
# They happen when the term_queue is directly closed
except ValueError:
break
except OSError:
break
class _SpallocMachine(SessionAware, SpallocMachine):
"""
Represents a Spalloc-controlled machine.
Don't make this yourself. Use :py:class:`SpallocClient` instead.
"""
__slots__ = ("__name", "__tags", "__width", "__height",
"__dead_boards", "__dead_links")
def __init__(self, session: Session, machine_data: JsonObject):
"""
:param _Session session:
:param dict machine_data:
"""
super().__init__(session, cast(str, machine_data["uri"]))
self.__name = cast(str, machine_data["name"])
self.__tags = frozenset(cast(List[str], machine_data["tags"]))
self.__width = cast(int, machine_data["width"])
self.__height = cast(int, machine_data["height"])
self.__dead_boards = cast(list, machine_data["dead-boards"])
self.__dead_links = cast(list, machine_data["dead-links"])
@property
@overrides(SpallocMachine.name)
def name(self) -> str:
return self.__name
@property
@overrides(SpallocMachine.tags)
def tags(self) -> FrozenSet[str]:
return self.__tags
@property
@overrides(SpallocMachine.width)
def width(self) -> int:
return self.__width
@property
@overrides(SpallocMachine.height)
def height(self) -> int:
return self.__height
@property
@overrides(SpallocMachine.dead_boards)
def dead_boards(self) -> list:
return self.__dead_boards
@property
@overrides(SpallocMachine.dead_links)
def dead_links(self) -> list:
return self.__dead_links
@property
@overrides(SpallocMachine.area)
def area(self) -> Tuple[int, int]:
return (self.width, self.height)
def __repr__(self):
return "SpallocMachine" + str((
self.name, self.tags, self.width, self.height, self.dead_boards,
self.dead_links))
class _ProxyPing(threading.Thread):
"""
Sends ping messages to an open websocket
"""
def __init__(self, ws, sleep_time=30):
super().__init__(daemon=True)
self.__ws = ws
self.__sleep_time = sleep_time
self.__closed = False
self.start()
def run(self):
"""
The handler loop of this thread
"""
while self.__ws.connected:
try:
self.__ws.ping()
except Exception: # pylint: disable=broad-except
# If closed, ignore error and get out of here
if self.__closed:
break
# Make someone aware of the error
logger.exception("Error in websocket before close")
sleep(self.__sleep_time)
def close(self):
"""
Mark as closed to avoid error messages.
"""
self.__closed = True
_WSCB: TypeAlias = Callable[[Optional[bytes]], None]
class _ProxyReceiver(threading.Thread):
"""
Receives all messages off an open websocket and dispatches them to
registered listeners.
"""
def __init__(self, ws: WebSocket):
super().__init__(daemon=True)
self.__ws = ws
self.__returns: Dict[int, _WSCB] = {}
self.__handlers: Dict[int, _WSCB] = {}
self.__correlation_id = 0
self.__closed = False
self.start()
def run(self) -> None:
"""
The handler loop of this thread.
"""
while self.__ws.connected:
try:
result: Tuple[int, bytes] = self.__ws.recv_data()
frame = result[1]
if len(frame) < _msg.size:
# Message is out of protocol
continue
except Exception: # pylint: disable=broad-except
# If closed, ignore error and get out of here
if self.__closed:
break
# Make someone aware of the error
logger.exception("Error in websocket before close")
# If we are disconnected before closing, make errors happen
if not self.__ws.connected:
for rt in self.__returns.values():
rt(None)
for hd in self.__handlers.values():
hd(None)
break
code, num = _msg.unpack_from(frame, 0)
if code == ProxyProtocol.MSG:
self.dispatch_message(num, frame)
else:
self.dispatch_return(num, frame)
def expect_return(self, handler: _WSCB) -> int:
"""
Register a one-shot listener for a call-like message's return.
:return: The correlation ID
"""
c = self.__correlation_id
self.__correlation_id += 1
self.__returns[c] = handler
return c
def listen(self, channel_id: int, handler: _WSCB):
"""
Register a persistent listener for one-way messages.
"""
self.__handlers[channel_id] = handler
def dispatch_return(self, correlation_id: int, msg: bytes):
"""
Dispatch a received call-return message.
"""
handler = self.__returns.pop(correlation_id, None)
if handler:
handler(msg)
def dispatch_message(self, channel_id: int, msg: bytes):
"""
Dispatch a received one-way message.
"""
handler = self.__handlers.get(channel_id)
if handler:
handler(msg)
def unlisten(self, channel_id: int):
"""
Deregister a listener for a channel
"""
self.__handlers.pop(channel_id)
def close(self) -> None:
"""
Mark receiver closed to avoid errors
"""
self.__closed = True
class _SpallocJob(SessionAware, SpallocJob):
"""
Represents a job in Spalloc.
Don't make this yourself. Use :py:class:`SpallocClient` instead.
"""
__slots__ = ("__machine_url", "__chip_url",
"_keepalive_url", "__keepalive_handle", "__proxy_handle",
"__proxy_thread", "__proxy_ping")
def __init__(self, session: Session, job_handle: str):
"""
:param _Session session:
:param str job_handle:
"""
super().__init__(session, job_handle)
logger.info("established job at {}", job_handle)
self.__machine_url = self._url + "machine"
self.__chip_url = self._url + "chip"
self._keepalive_url = self._url + "keepalive"
self.__keepalive_handle: Optional[Queue] = None
self.__proxy_handle: Optional[WebSocket] = None
self.__proxy_thread: Optional[_ProxyReceiver] = None
self.__proxy_ping: Optional[_ProxyPing] = None
@overrides(SpallocJob.get_session_credentials_for_db)
def get_session_credentials_for_db(self) -> Mapping[Tuple[str, str], str]:
config = {}
config["SPALLOC", "service uri"] = self._service_url
config["SPALLOC", "job uri"] = self._url
cookies, headers = self._session_credentials
if "Authorization" in headers:
# We never write the authorisation headers themselves;
# we just extend the session
del headers["Authorization"]
for k, v in cookies.items():
config["COOKIE", k] = v
for k, v in headers.items():
config["HEADER", k] = v
return config
@overrides(SpallocJob.get_state)
def get_state(self, wait_for_change: bool = False) -> SpallocState:
timeout: Optional[int] = 10
if wait_for_change:
timeout = None
obj = self._get(
self._url, wait=wait_for_change, timeout=timeout).json()
return SpallocState[obj["state"]]
@overrides(SpallocJob.get_root_host)
def get_root_host(self) -> Optional[str]:
r = self._get(self.__machine_url)
if r.status_code == 204:
return None
obj = r.json()
for c in obj["connections"]:
[x, y], host = c
if x == 0 and y == 0:
return host
return None
@overrides(SpallocJob.get_connections)
def get_connections(self) -> Dict[XY, str]:
r = self._get(self.__machine_url)
if r.status_code == 204:
return {}
return {
(int(x), int(y)): str(host)
for ((x, y), host) in r.json()["connections"]
}
@property
def __proxy_url(self) -> Optional[str]:
"""
The URL for talking to the proxy connection system.
"""
r = self._get(self._url)
if r.status_code == 204:
return None
try:
url = r.json()["proxy-ref"]
logger.info("Connecting to proxy on {}", url)
return url
except KeyError:
return None
def __init_proxy(self) -> Tuple[_ProxyReceiver, WebSocket]:
if self.__proxy_handle is None or not self.__proxy_handle.connected:
if self.__proxy_url is None:
raise ValueError("no proxy available")
self.__proxy_handle = self._websocket(
self.__proxy_url, origin=get_hostname(self._url))
self.__proxy_thread = _ProxyReceiver(self.__proxy_handle)
self.__proxy_ping = _ProxyPing(self.__proxy_handle)
assert self.__proxy_handle is not None
assert self.__proxy_thread is not None
return self.__proxy_thread, self.__proxy_handle
@overrides(SpallocJob.connect_to_board)
def connect_to_board(
self, x: int, y: int,
port: int = SCP_SCAMP_PORT) -> SpallocSCPConnection:
proxy, ws = self.__init_proxy()
return _ProxiedSCAMPConnection(ws, proxy, int(x), int(y), int(port))
@overrides(SpallocJob.connect_for_booting)
def connect_for_booting(self) -> SpallocBootConnection:
proxy, ws = self.__init_proxy()
return _ProxiedBootConnection(ws, proxy)
@overrides(SpallocJob.open_eieio_connection)
def open_eieio_connection(self, x: int, y: int) -> SpallocEIEIOConnection:
proxy, ws = self.__init_proxy()
return _ProxiedEIEIOConnection(
ws, proxy, int(x), int(y), SCP_SCAMP_PORT)
@overrides(SpallocJob.open_eieio_listener_connection)
def open_eieio_listener_connection(self) -> SpallocEIEIOListener:
proxy, ws = self.__init_proxy()
return _ProxiedEIEIOListener(ws, proxy, self.get_connections())
@overrides(SpallocJob.open_udp_listener_connection)
def open_udp_listener_connection(self) -> UDPConnection:
proxy, ws = self.__init_proxy()
return _ProxiedUDPListener(ws, proxy, self.get_connections())
@overrides(SpallocJob.wait_for_state_change)
def wait_for_state_change(self, old_state: SpallocState,
timeout: Optional[int] = None) -> SpallocState:
while old_state != SpallocState.DESTROYED:
obj = self._get(self._url, wait="true", timeout=timeout).json()
s = SpallocState[obj["state"]]
if s != old_state or s == SpallocState.DESTROYED:
return s
return old_state
@overrides(SpallocJob.wait_until_ready)
def wait_until_ready(self, timeout: Optional[int] = None,
n_retries: Optional[int] = None):
state = self.get_state()
retries = 0
while (state != SpallocState.READY and
(n_retries is None or retries < n_retries)):
retries += 1
state = self.wait_for_state_change(state, timeout=timeout)
if state == SpallocState.DESTROYED:
raise SpallocException("job was unexpectedly destroyed")
@overrides(SpallocJob.destroy)
def destroy(self, reason: str = "finished"):
if self.__keepalive_handle:
self.__keepalive_handle.close()
self.__keepalive_handle = None
if self.__proxy_handle is not None:
if self.__proxy_thread:
self.__proxy_thread.close()
if self.__proxy_ping:
self.__proxy_ping.close()
self.__proxy_handle.close()
self._delete(self._url, reason=str(reason))
logger.info("deleted job at {}", self._url)
@overrides(SpallocJob.keepalive)
def keepalive(self) -> None:
self._put(self._keepalive_url, "alive")
@overrides(SpallocJob.launch_keepalive_task, extend_doc=True)
def launch_keepalive_task(
self, period: float = 30) -> ContextManager[Process]:
"""
.. note::
Tricky! *Cannot* be done with a thread, as the main thread is known
to do significant amounts of CPU-intensive work.
"""
if self.__keepalive_handle is not None:
raise SpallocException("cannot keep job alive from two tasks")
q: Queue = Queue(1)
p = Process(target=_spalloc_keepalive, args=(
self._keepalive_url, 0 + period, q,
*self._session_credentials), daemon=True)
p.start()
self.__keepalive_handle = q
return self.__closer(q, p)
@contextmanager
def __closer(self, q: Queue, p: Process) -> Iterator[Process]:
try:
yield p
finally:
q.put("quit")
# Give it a second, and if it still isn't dead, kill it
p.join(1)
if p.is_alive():
p.kill()
@overrides(SpallocJob.where_is_machine)
def where_is_machine(self, x: int, y: int) -> Optional[
Tuple[int, int, int]]:
r = self._get(self.__chip_url, x=int(x), y=int(y))
if r.status_code == 204:
return None
return cast(Tuple[int, int, int], tuple(
r.json()["physical-board-coordinates"]))
@property
def _keepalive_handle(self) -> Optional[Queue]:
return self.__keepalive_handle
@_keepalive_handle.setter
def _keepalive_handle(self, handle: Queue):
if self.__keepalive_handle is not None:
raise SpallocException("cannot keep job alive from two tasks")
self.__keepalive_handle = handle
@overrides(SpallocJob.create_transceiver)
def create_transceiver(self) -> Transceiver:
if self.get_state() != SpallocState.READY:
raise SpallocException("job not ready to execute scripts")
proxies: List[Connection] = [
self.connect_to_board(x, y) for (x, y) in self.get_connections()]
# Also need a boot connection
proxies.append(self.connect_for_booting())
return create_transceiver_from_connections(connections=proxies)
def __repr__(self):
return f"SpallocJob({self._url})"
class _ProxiedConnection(metaclass=AbstractBase):
"""
Core multiplexer/demultiplexer emulating a connection that is proxied
over a websocket.
None of the methods are public because subclasses may expose a profile of
them to conform to a particular type of connection.
"""
def __init__(self, ws: WebSocket, receiver: _ProxyReceiver):
self.__ws: Optional[WebSocket] = ws
self.__receiver: Optional[_ProxyReceiver] = receiver
self.__msgs: queue.SimpleQueue = queue.SimpleQueue()
self.__call_queue: queue.Queue = queue.Queue(1)
self.__call_lock = threading.RLock()
self.__current_msg: Optional[bytes] = None
self.__handle = self._open_connection()
self.__receiver.listen(self.__handle, self.__msgs.put)
@abstractmethod
def _open_connection(self) -> int:
raise NotImplementedError
def _call(self, protocol: ProxyProtocol, packer: struct.Struct,
unpacker: struct.Struct, *args) -> Tuple[Any, ...]:
"""
Do a synchronous call.
:param protocol:
The protocol message number.
:param packer:
How to form the protocol message. The first two arguments passed
will be the protocol message number and an issued correlation ID
(not needed by the caller).
:param unpacker:
How to extract the expected response.
:param args:
Additional arguments to pass to the packer.
:return:
The results from the unpacker *after* the protocol message code and
the correlation ID.
:raises IOError:
If something goes wrong. This could be due to the websocket being
closed, or the receipt of an ERROR response.
"""
if not self._connected:
raise IOError("socket closed")
if not self.__receiver:
raise IOError("socket closed")
if not self.__ws:
raise IOError("socket closed")
with self.__call_lock:
# All calls via websocket use correlation_id
correlation_id = self.__receiver.expect_return(
self.__call_queue.put)
self.__ws.send_binary(packer.pack(protocol, correlation_id, *args))
if not self._connected:
raise IOError("socket closed after send!")
reply = self.__call_queue.get()
code, _ = _msg.unpack_from(reply, 0)
if code == ProxyProtocol.ERROR:
# Rest of message is UTF-8 encoded error message string
payload = reply[_msg.size:].decode("utf-8")
if len(payload):
raise _ProxyServiceError(payload)
raise _ProxyServiceError(
f"unknown problem with {protocol} call")
return unpacker.unpack(reply)[2:]
@property
def _connected(self) -> bool:
return bool(self.__ws and self.__ws.connected)
def _throw_if_closed(self) -> None:
if not self._connected:
raise IOError("socket closed")
def _close(self) -> None:
if self._connected:
channel_id, = self._call(
ProxyProtocol.CLOSE, _close_req, _open_close_res,
self.__handle)
if channel_id != self.__handle:
raise IOError("failed to close proxy socket")
if self.__receiver:
self.__receiver.unlisten(self.__handle)
self.__ws = None
self.__receiver = None
def _send(self, message: bytes):
self._throw_if_closed()
# Put the header on the front and send it
if not self.__ws:
raise IOError("socket closed")
self.__ws.send_binary(_msg.pack(
ProxyProtocol.MSG, self.__handle) + message)
def _send_to(self, message: bytes, x: int, y: int, port: int):
self._throw_if_closed()
# Put the header on the front and send it
if not self.__ws:
raise IOError("socket closed")
self.__ws.send_binary(_msg_to.pack(
ProxyProtocol.MSG_TO, self.__handle, x, y, port) + message)
def __get(self, timeout: float = 0.5) -> bytes:
"""
Get a value from the queue. Handles block/non-block switching and
trimming of the message protocol prefix.
"""
if not timeout:
return self.__msgs.get(block=False)[_msg.size:]
else:
return self.__msgs.get(timeout=timeout)[_msg.size:]
def _receive(self, timeout: Optional[float] = None) -> bytes:
if self.__current_msg is not None:
try:
return self.__current_msg
finally:
self.__current_msg = None
if timeout is None:
while True:
try:
return self.__get()
except queue.Empty:
self._throw_if_closed()
else:
try:
return self.__get(timeout)
except queue.Empty as e:
self._throw_if_closed()
raise SpinnmanTimeoutException("receive", timeout) from e
def _is_ready_to_receive(self, timeout: float = 0) -> bool:
# If we already have a message or the queue peek succeeds, return now
if self.__current_msg is not None or not self.__msgs.empty():
return True
try:
self.__current_msg = self.__get(timeout)
return True
except queue.Empty:
return False
class _ProxiedBidirectionalConnection(
_ProxiedConnection, SpallocProxiedConnection):
"""
A connection that talks to a particular board via the proxy.
"""
def __init__(
self, ws: WebSocket, receiver: _ProxyReceiver,
x: int, y: int, port: int):
self.__connect_args = (x, y, port)
super().__init__(ws, receiver)
@overrides(_ProxiedConnection._open_connection)
def _open_connection(self) -> int:
handle, = self._call(
ProxyProtocol.OPEN, _open_req, _open_close_res,
*self.__connect_args)
return handle
@overrides(Connection.is_connected)
def is_connected(self) -> bool:
"""
Determines if the medium is connected at this point in time.
:return: True if the medium is connected, False otherwise
:rtype: bool
"""
return self._connected
@overrides(Connection.close)
def close(self) -> None:
"""
Closes the connection.
"""
self._close()
@overrides(SpallocProxiedConnection.send)
def send(self, data: bytes):
if not isinstance(data, (bytes, bytearray)):
data = bytes(data)
self._send(data)
@overrides(SpallocProxiedConnection.receive)
def receive(self, timeout: Optional[float] = None) -> bytes:
return self._receive(timeout)
@overrides(Listenable.is_ready_to_receive)
def is_ready_to_receive(self, timeout: float = 0) -> bool:
return self._is_ready_to_receive(timeout)
@abstractmethod
def __str__(self) -> str:
raise NotImplementedError
class _ProxiedUnboundConnection(
_ProxiedConnection, SpallocProxiedConnection):
"""
A connection that can listen to all boards via the proxy, but which can
only send if a target board is provided.
"""
def __init__(self, ws: WebSocket, receiver: _ProxyReceiver):
super().__init__(ws, receiver)
self.__addr: Optional[str] = None
self.__port: Optional[int] = None
@overrides(_ProxiedConnection._open_connection)
def _open_connection(self) -> int:
handle, ip1, ip2, ip3, ip4, self.__port = self._call(
ProxyProtocol.OPEN_UNBOUND, _open_listen_req, _open_listen_res)
# Assemble the address into the format expected elsewhere
self.__addr = f"{ip1}.{ip2}.{ip3}.{ip4}"
return handle
@property
def _addr(self) -> Optional[str]:
return self.__addr if self._connected else None
@property
def _port(self) -> Optional[int]:
return self.__port if self._connected else None
@overrides(Connection.is_connected)
def is_connected(self) -> bool:
"""
Determines if the medium is connected at this point in time.
:return: True if the medium is connected, False otherwise
:rtype: bool
"""
return self._connected
@overrides(Connection.close)
def close(self) -> None:
"""
Closes the connection.
"""
self._close()
@overrides(SpallocProxiedConnection.send)
def send(self, data: bytes):
self._throw_if_closed()
raise IOError("socket is not open for sending")
@overrides(SpallocProxiedConnection.receive)
def receive(self, timeout: Optional[float] = None) -> bytes:
return self._receive(timeout)
@overrides(Listenable.is_ready_to_receive)
def is_ready_to_receive(self, timeout: float = 0) -> bool:
return self._is_ready_to_receive(timeout)
@abstractmethod
def __str__(self) -> str:
raise NotImplementedError
class _ProxiedSCAMPConnection(
_ProxiedBidirectionalConnection, SpallocSCPConnection):
__slots__ = ("__chip_x", "__chip_y")
def __init__(
self, ws: WebSocket, receiver: _ProxyReceiver,
x: int, y: int, port: int):
super().__init__(ws, receiver, x, y, port)
SpallocSCPConnection.__init__(self, x, y)
def __str__(self):
return f"SCAMPConnection[proxied]({self.chip_x},{self.chip_y})"
class _ProxiedBootConnection(
_ProxiedBidirectionalConnection, SpallocBootConnection):
__slots__ = ()
def __init__(self, ws: WebSocket, receiver: _ProxyReceiver):
super().__init__(ws, receiver, 0, 0, UDP_BOOT_CONNECTION_DEFAULT_PORT)
def __str__(self):
return "BootConnection[proxied]()"
class _ProxiedEIEIOConnection(
_ProxiedBidirectionalConnection,
SpallocEIEIOConnection, SpallocProxiedConnection):
# Special: This is a unidirectional receive-only connection
__slots__ = ("__addr", "__port", "__chip_x", "__chip_y")
def __init__(
self, ws: WebSocket, receiver: _ProxyReceiver,
x: int, y: int, port: int):
super().__init__(ws, receiver, x, y, port)
self.__chip_x = x
self.__chip_y = y
@property
@overrides(SpallocEIEIOConnection._coords)
def _coords(self) -> XY:
return self.__chip_x, self.__chip_y
def send_to(
self,
data: bytes, address: tuple): # pylint: disable=unused-argument
"""
Direct ``send_to`` is unsupported.
"""
self._throw_if_closed()
raise IOError("socket is not open for sending")
def __str__(self):
return (f"EIEIOConnection[proxied](remote:{self.__chip_x},"
f"{self.__chip_y})")
class _ProxiedEIEIOListener(_ProxiedUnboundConnection, SpallocEIEIOListener):
__slots__ = ("__conns", )
def __init__(self, ws: WebSocket, receiver: _ProxyReceiver,
conns: Dict[XY, str]):
super().__init__(ws, receiver)
# Invert the map
self.__conns = {ip: xy for (xy, ip) in conns.items()}
@overrides(SpallocEIEIOListener.send_to_chip)
def send_to_chip(
self, message: bytes, x: int, y: int, port: int = SCP_SCAMP_PORT):
if not isinstance(message, (bytes, bytearray)):
message = bytes(message)
self._send_to(bytes(message), x, y, port)
@property
@overrides(SpallocEIEIOListener.local_ip_address)
def local_ip_address(self) -> str:
return self._addr or "0.0.0.0"
@property
@overrides(SpallocEIEIOListener.local_port)
def local_port(self) -> int:
return self._port or 0
@overrides(SpallocEIEIOListener._get_chip_coords)
def _get_chip_coords(self, ip_address: str) -> XY:
return self.__conns[str(ip_address)]
def __str__(self):
return f"EIEIOConnection[proxied](local:{self._addr}:{self._port})"
class _ProxiedUDPListener(_ProxiedUnboundConnection, UDPConnection):
__slots__ = ("__conns", )
def __init__(self, ws: WebSocket, receiver: _ProxyReceiver,
conns: Dict[XY, str]):
super().__init__(ws, receiver)
# Invert the map
self.__conns = {ip: xy for (xy, ip) in conns.items()}
@overrides(UDPConnection.send_to)
def send_to(self, data: bytes, address: Tuple[str, int]):
ip, port = address
x, y = self.__conns[ip]
self._send_to(data, x, y, port)
@property
@overrides(UDPConnection.local_ip_address)
def local_ip_address(self) -> str:
return self._addr or "0.0.0.0"
@property
@overrides(UDPConnection.local_port)
def local_port(self) -> int:
return self._port or 0
def __str__(self):
return f"UDPConnection[proxied](local:{self._addr}:{self._port})"