Source code for spinnman.connections.udp_packet_connections.bmp_connection

# Copyright (c) 2015 The University of Manchester
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import struct
from typing import Optional, Sequence, Tuple

from spinn_utilities.overrides import overrides

from spinnman.constants import SCP_SCAMP_PORT
from spinnman.messages.scp.enums import SCPResult
from spinnman.messages.scp.abstract_messages import AbstractSCPRequest
from spinnman.connections.abstract_classes import AbstractSCPConnection
from spinnman.model import BMPConnectionData

from .udp_connection import UDPConnection

_TWO_SHORTS = struct.Struct("<2H")
_TWO_SKIP = struct.Struct("<2x")


class BMPConnection(UDPConnection, AbstractSCPConnection):
    """
    A BMP connection which supports queries to the BMP of a SpiNNaker machine.
    """
    __slots__ = ("_boards", )

    def __init__(self, connection_data: BMPConnectionData):
        """
        :param BMPConnectionData connection_data:
            The description of what to connect to.
        """
        port = SCP_SCAMP_PORT if connection_data.port_num is None\
            else connection_data.port_num
        super().__init__(
            remote_host=connection_data.ip_address, remote_port=port)
        self._boards = connection_data.boards

    @property
    def boards(self) -> Sequence[int]:
        """
        The set of boards supported by the BMP.

        :rtype: iterable(int)
        """
        return self._boards

    @property
    @overrides(AbstractSCPConnection.chip_x, extend_doc=False)
    def chip_x(self) -> int:
        """
        Defined to satisfy the AbstractSCPConnection - always 0 for a BMP.
        """
        return 0

    @property
    @overrides(AbstractSCPConnection.chip_y, extend_doc=False)
    def chip_y(self) -> int:
        """
        Defined to satisfy the AbstractSCPConnection - always 0 for a BMP.
        """
        return 0

[docs] @overrides(AbstractSCPConnection.get_scp_data) def get_scp_data(self, scp_request: AbstractSCPRequest) -> bytes: scp_request.sdp_header.update_for_send(0, 0) return _TWO_SKIP.pack() + scp_request.bytestring
[docs] @overrides(AbstractSCPConnection.receive_scp_response) def receive_scp_response(self, timeout: Optional[float] = 1.0) -> Tuple[ SCPResult, int, bytes, int]: data = self.receive(timeout) result, sequence = _TWO_SHORTS.unpack_from(data, 10) return SCPResult(result), sequence, data, 2
def __repr__(self): return ( f"BMPConnection(" f"boards={self._boards}, local_host={self.local_ip_address}, " f"local_port={self.local_port}, " f"remote_host={self.remote_ip_address}, " f"remote_port={self.remote_port})")