Source code for spinnman.connections.udp_packet_connections.eieio_connection
from .udp_connection import UDPConnection
from spinnman.connections.abstract_classes \
import EIEIOReceiver, EIEIOSender, Listenable
from spinnman.messages.eieio \
import read_eieio_command_message, read_eieio_data_message
import struct
_REPR_TEMPLATE = "EIEIOConnection(local_host={}, local_port={},"\
"remote_host={}, remote_port={})"
[docs]class EIEIOConnection(
UDPConnection, EIEIOReceiver, EIEIOSender,
Listenable):
""" A UDP connection for sending and receiving raw EIEIO messages
"""
def __init__(self, local_host=None, local_port=None, remote_host=None,
remote_port=None):
"""
:param local_host: The optional ip address or host name of the local\
interface to listen on
:type local_host: str
:param local_port: The optional local port to listen on
:type local_port: int
:param remote_host: The optional remote host name or ip address to\
send messages to. If not specified, sending will not be\
possible using this connection
:type remote_host: str
:param remote_port: The optional remote port number to send messages\
to. If not specified, sending will not be possible using this\
connection
"""
UDPConnection.__init__(
self, local_host, local_port, remote_host, remote_port)
EIEIOReceiver.__init__(self)
EIEIOSender.__init__(self)
Listenable.__init__(self)
[docs] def receive_eieio_message(self, timeout=None):
data = self.receive(timeout)
header = struct.unpack_from("<H", data)[0]
if header & 0xC000 == 0x4000:
return read_eieio_command_message(data, 0)
else:
return read_eieio_data_message(data, 0)
[docs] def send_eieio_message(self, eieio_message):
self.send(eieio_message.bytestring)
[docs] def send_eieio_message_to(self, eieio_message, ip_address, port):
self.send_to(eieio_message.bytestring, (ip_address, port))
[docs] def get_receive_method(self):
return self.receive_eieio_message
def __repr__(self):
return _REPR_TEMPLATE.format(
self.local_ip_address, self.local_port,
self.remote_ip_address, self.remote_port)