Source code for spinnman.spalloc.spalloc_scp_connection

# Copyright (c) 2022 The University of Manchester
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import struct
from typing import Optional, Tuple
from spinn_utilities.abstract_base import AbstractBase
from spinn_utilities.overrides import overrides
from spinnman.connections.udp_packet_connections import SCAMPConnection
from spinnman.messages.sdp import SDPMessage, SDPFlag
from spinnman.messages.scp.abstract_messages import AbstractSCPRequest
from spinnman.messages.scp.enums import SCPResult
from .spalloc_proxied_connection import SpallocProxiedConnection

_TWO_SHORTS = struct.Struct("<2H")
_TWO_SKIP: bytes = b'\0\0'


[docs] class SpallocSCPConnection( SCAMPConnection, SpallocProxiedConnection, metaclass=AbstractBase): """ The socket interface supported by proxied sockets. The socket will always be talking to a specific board. This emulates a :py:class:`SCAMPConnection`. """ __slots__ = () def __init__(self, x, y): super(SpallocSCPConnection, self).__init__(x, y)
[docs] @overrides(SCAMPConnection.receive_sdp_message) def receive_sdp_message( self, timeout: Optional[float] = None) -> SDPMessage: data = self.receive(timeout) return SDPMessage.from_bytestring(data, 2)
[docs] @overrides(SCAMPConnection.send_sdp_message) def send_sdp_message(self, sdp_message: SDPMessage): # If a reply is expected, the connection should if sdp_message.sdp_header.flags == SDPFlag.REPLY_EXPECTED: sdp_message.sdp_header.update_for_send(self.chip_x, self.chip_y) else: sdp_message.sdp_header.update_for_send(0, 0) self.send(_TWO_SKIP + sdp_message.bytestring)
[docs] @overrides(SCAMPConnection.receive_scp_response) def receive_scp_response(self, timeout: Optional[float] = 1.0) -> Tuple[ SCPResult, int, bytes, int]: data = self.receive(timeout) result, sequence = _TWO_SHORTS.unpack_from(data, 10) return SCPResult(result), sequence, data, 2
[docs] @overrides(SCAMPConnection.get_scp_data) def get_scp_data( self, scp_request: AbstractSCPRequest, x: Optional[int] = None, y: Optional[int] = None) -> bytes: if x is None: x = self.chip_x if y is None: y = self.chip_y scp_request.sdp_header.update_for_send(x, y) return _TWO_SKIP + scp_request.bytestring