Source code for spinnman.connections.udp_packet_connections.sdp_connection
from spinnman.messages.sdp import SDPMessage, SDPFlag
from .udp_connection import UDPConnection
from .utils import update_sdp_header_for_udp_send
from spinnman.connections.abstract_classes \
import SDPReceiver, SDPSender, Listenable
import struct
[docs]class SDPConnection(
UDPConnection, SDPReceiver, SDPSender,
Listenable):
def __init__(self, chip_x=None, chip_y=None, local_host=None,
local_port=None, remote_host=None, remote_port=None):
"""
:param chip_x: The optional x-coordinate of the chip at the remote\
end of the connection. If not specified, it will not be\
possible to send SDP messages that require a response with\
this connection.
:type chip_x: int
:param chip_y: The optional y-coordinate of the chip at the remote\
end of the connection. If not specified, it will not be\
possible to send SDP messages that require a response with\
this connection.
:type chip_y: int
:param local_host: The optional ip address or host name of the local\
interface to listen on
:type local_host: str
:param local_port: The optional local port to listen on
:type local_port: int
:param remote_host: The optional remote host name or ip address to\
send messages to. If not specified, sending will not be\
possible using this connection
:type remote_host: str
:param remote_port: The optional remote port number to send messages\
to. If not specified, sending will not be possible using this\
connection
"""
UDPConnection.__init__(
self, local_host, local_port, remote_host, remote_port)
SDPReceiver.__init__(self)
SDPSender.__init__(self)
Listenable.__init__(self)
self._chip_x = chip_x
self._chip_y = chip_y
[docs] def receive_sdp_message(self, timeout=None):
data = self.receive(timeout)
return SDPMessage.from_bytestring(data, 2)
[docs] def send_sdp_message(self, sdp_message):
# If a reply is expected, the connection should
if sdp_message.sdp_header.flags == SDPFlag.REPLY_EXPECTED:
update_sdp_header_for_udp_send(
sdp_message.sdp_header, self._chip_x, self._chip_y)
else:
update_sdp_header_for_udp_send(sdp_message.sdp_header, 0, 0)
self.send(struct.pack("<2x") + sdp_message.bytestring)
[docs] def get_receive_method(self):
return self.receive_sdp_message
def __repr__(self):
return \
"SDPConnection(chip_x={}, chip_y={}, local_host={},"\
" local_port={}, remote_host={}, remote_port={})".format(
self._chip_x, self._chip_y, self.local_ip_address,
self.local_port, self.remote_ip_address, self.remote_port)