Source code for spinnman.messages.eieio.data_messages.eieio_data_message
from spinnman.exceptions import SpinnmanInvalidParameterException
from spinnman.exceptions import SpinnmanInvalidPacketException
from spinnman.messages.eieio.data_messages.eieio_key_payload_data_element\
import EIEIOKeyPayloadDataElement
from spinnman.messages.eieio.data_messages.eieio_key_data_element\
import EIEIOKeyDataElement
from spinnman.messages.eieio.data_messages.eieio_data_header\
import EIEIODataHeader
from spinnman.messages.eieio.abstract_messages.abstract_eieio_message\
import AbstractEIEIOMessage
from spinnman.messages.eieio.eieio_type import EIEIOType
from spinnman.messages.eieio.eieio_prefix import EIEIOPrefix
from spinnman import constants
import math
[docs]class EIEIODataMessage(AbstractEIEIOMessage):
""" An EIEIO Data message
"""
def __init__(self, eieio_header, data_reader=None):
"""
:param eieio_header: The header of the message
:type eieio_header:\
:py:class:`spinnman.messages.eieio.data_messages.eieio_data_header.EIEIODataHeader`
:param data_reader: Optional reader of data contained within the\
packet, or None if this packet is being written
:type data_reader:\
:py:class:`spinnman.data.abstract_data_reader.AbstractDataReader`
"""
# The header
self._eieio_header = eieio_header
# Elements to be written
self._elements = list()
# Keeping track of the reading of the data
self._data_reader = data_reader
self._elements_read = 0
@property
def eieio_header(self):
return self._eieio_header
@staticmethod
[docs] def min_packet_length(eieio_type, is_prefix=False, is_payload_base=False):
""" The minimum length of a message with the given header, in bytes
:param eieio_type: the type of message
:type eieio_type:\
:py:class:`spinnman.spinnman.messages.eieio.eieio_type.EIEIOType`
:param is_prefix: True if there is a prefix, False otherwise
:type is_prefix: bool
:param is_payload_base: True if there is a payload base, False\
otherwise
:type is_payload_base: bool
:return: The minimum size of the packet in bytes
:rtype: int
"""
header_size = EIEIODataHeader.get_header_size(eieio_type, is_prefix,
is_payload_base)
return header_size + eieio_type.payload_bytes
@property
def max_n_elements(self):
""" The maximum number of elements that can fit in the packet
:rtype: int
"""
return int(math.floor((constants.UDP_MESSAGE_MAX_SIZE -
self._eieio_header.size) /
(self._eieio_header.eieio_type.key_bytes +
self._eieio_header.eieio_type.payload_bytes)))
@property
def n_elements(self):
""" The number of elements in the packet
"""
return self._eieio_header.count
@property
def size(self):
""" The size of the packet with the current contents
"""
return (self._eieio_header.size +
((self._eieio_header.eieio_type.key_bytes +
self._eieio_header.eieio_type.payload_bytes) *
self._eieio_header.count))
[docs] def add_element(self, element):
""" Add an element to the message. The correct type of element must\
be added, depending on the header values
:param element: The element to be added
:type element:\
:py:class:`spinnman.messages.eieio.data_messages.abstract_eieio_data_element.AbstractEIEIODataElement`
:raise SpinnmanInvalidParameterException: If the element is not\
compatible with the header
:raise SpinnmanInvalidPacketException: If the message was created to\
read data from a reader
"""
if self._data_reader is not None:
raise SpinnmanInvalidPacketException(
"EIEIODataMessage", "This packet is read-only")
if (self._eieio_header.eieio_type.payload_bytes == 0 and
isinstance(element, EIEIOKeyPayloadDataElement)):
raise SpinnmanInvalidParameterException(
"element", element,
"The element has a payload, but the header says no payload")
if (self._eieio_header.eieio_type.payload_bytes != 0 and
isinstance(element, EIEIOKeyDataElement)):
raise SpinnmanInvalidParameterException(
"element", element,
"The element has nopayload, but the header says payload")
self._elements.append(element)
self._eieio_header.increment_count()
@property
def is_next_element(self):
""" Determine if there is another element to be read
:return: True if the message was created with data, and there are more\
elements to be read
:rtype: bool
"""
return (self._data_reader is not None and
self._elements_read < self._eieio_header.count)
@property
def next_element(self):
""" The next element to be read, or None if no more elements. The\
exact type of element returned depends on the packet type
:rtype:\
:py:class:`spinnman.messages.eieio.data_messages.abstract_eieio_data_element.AbstractEIEIODataElement`
"""
if not self.is_next_element:
return None
self._elements_read += 1
key = None
payload = None
if self._eieio_header.eieio_type == EIEIOType.KEY_16_BIT:
key = self._data_reader.read_short()
if self._eieio_header.eieio_type == EIEIOType.KEY_32_BIT:
key = self._data_reader.read_int()
if self._eieio_header.eieio_type == EIEIOType.KEY_PAYLOAD_16_BIT:
key = self._data_reader.read_short()
payload = self._data_reader.read_short()
if self._eieio_header.eieio_type == EIEIOType.KEY_PAYLOAD_32_BIT:
key = self._data_reader.read_int()
payload = self._data_reader.read_int()
if self._eieio_header.prefix is not None:
if self._eieio_header.prefix_type == EIEIOPrefix.UPPER_HALF_WORD:
key = key | (self._eieio_header.prefix << 16)
else:
key = key | self._eieio_header.prefix
if self._eieio_header.payload_base is not None:
if payload is not None:
payload = payload | self._eieio_header.payload_base
else:
payload = self._eieio_header.payload_base
if payload is None:
return EIEIOKeyDataElement(key)
else:
return EIEIOKeyPayloadDataElement(key, payload,
self._eieio_header.is_time)
[docs] def write_eieio_message(self, byte_writer):
self._eieio_header.write_eieio_header(byte_writer)
for element in self._elements:
element.write_element(self._eieio_header.eieio_type, byte_writer)
def __str__(self):
if self._data_reader is not None:
return "EIEIODataMessage:{}:{}".format(
self._eieio_header, self._eieio_header.count)
return "EIEIODataMessage:{}:{}".format(
self._eieio_header, self._elements)
def __repr__(self):
return self.__str__()