Source code for spinnman.messages.eieio.data_messages.eieio_data_message

# Copyright (c) 2015 The University of Manchester
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.

import struct
from typing import Optional
from spinn_utilities.overrides import overrides
from spinnman.exceptions import (
    SpinnmanInvalidPacketException, SpinnmanInvalidParameterException)
from spinnman.messages.eieio import (
    AbstractEIEIOMessage, EIEIOType, EIEIOPrefix)
from spinnman.constants import UDP_MESSAGE_MAX_SIZE
from .eieio_data_header import EIEIODataHeader
from .key_data_element import KeyDataElement
from .key_payload_data_element import KeyPayloadDataElement
from .abstract_data_element import AbstractDataElement

_ONE_SHORT = struct.Struct("<H")
_TWO_SHORTS = struct.Struct("<HH")
_ONE_WORD = struct.Struct("<I")
_TWO_WORDS = struct.Struct("<II")

class EIEIODataMessage(AbstractEIEIOMessage):
    An EIEIO Data message.
    __slots__ = (

    def __init__(self, eieio_header: EIEIODataHeader,
                 data: Optional[bytes] = None, offset: int = 0):
        :param EIEIODataHeader eieio_header: The header of the message
        :param bytes data: Optional data contained within the packet
        :param int offset: Optional offset where the valid data starts
        # The header
        self._header = eieio_header

        # Elements to be written
        self._elements = b""

        # Keeping track of the reading of the data
        self._data = data
        self._offset = offset
        self._elements_read = 0

[docs] @staticmethod def create( eieio_type: EIEIOType, count: int = 0, data: Optional[bytes] = None, offset: int = 0, key_prefix: Optional[int] = None, payload_prefix: Optional[int] = None, timestamp: Optional[int] = None, prefix_type: EIEIOPrefix = EIEIOPrefix.LOWER_HALF_WORD ) -> "EIEIODataMessage": """ Create a data message. :param EIEIOType eieio_type: The type of the message :param int count: The number of items in the message :param bytes data: The data in the message :param int offset: The offset in the data where the actual data starts :param int key_prefix: The prefix of the keys :param int payload_prefix: The prefix of the payload :param int timestamp: The timestamp of the packet :param EIEIOPrefix prefix_type: The type of the key prefix if 16-bits """ # pylint: disable=too-many-arguments payload_base = payload_prefix if timestamp is not None: payload_base = timestamp return EIEIODataMessage( eieio_header=EIEIODataHeader( eieio_type, count=count, prefix=key_prefix, payload_base=payload_base, prefix_type=prefix_type, is_time=timestamp is not None), data=data, offset=offset)
@property @overrides(AbstractEIEIOMessage.eieio_header) def eieio_header(self) -> EIEIODataHeader: """ :rtype: EIEIODataHeader """ return self._header
[docs] @staticmethod def min_packet_length( eieio_type: EIEIOType, is_prefix: bool = False, is_payload_base: bool = False, is_timestamp: bool = False) -> int: """ The minimum length of a message with the given header, in bytes. :param EIEIOType eieio_type: the type of message :param bool is_prefix: True if there is a prefix, False otherwise :param bool is_payload_base: True if there is a payload base, False otherwise :param bool is_timestamp: True if there is a timestamp, False otherwise :return: The minimum size of the packet in bytes :rtype: int """ header_size = EIEIODataHeader.get_header_size( eieio_type, is_prefix, is_payload_base | is_timestamp) return header_size + eieio_type.payload_bytes
[docs] def get_min_packet_length(self) -> int: """ Get the minimum length of a message instance in bytes. :rtype: int """ return self.min_packet_length( eieio_type=self._header.eieio_type, is_prefix=self._header.prefix is not None, is_payload_base=self._header.payload_base is not None)
@property def max_n_elements(self) -> int: """ The maximum number of elements that can fit in the packet. :rtype: int """ ty = self._header.eieio_type return (UDP_MESSAGE_MAX_SIZE - self._header.size) // ( ty.key_bytes + ty.payload_bytes) @property def n_elements(self) -> int: """ The number of elements in the packet. :rtype: int """ return self._header.count @property def size(self) -> int: """ The size of the packet with the current contents. :rtype: int """ ty = self._header.eieio_type return (self._header.size + (ty.key_bytes + ty.payload_bytes) * self._header.count)
[docs] def add_key_and_payload(self, key: int, payload: int) -> None: """ Adds a key and payload to the packet. :param int key: The key to add :param int payload: The payload to add :raise SpinnmanInvalidParameterException: If the key or payload is too big for the format, or the format doesn't expect a payload """ if key > self._header.eieio_type.max_value: raise SpinnmanInvalidParameterException( "key", key, "Larger than the maximum allowed of " f"{self._header.eieio_type.max_value}") if payload > self._header.eieio_type.max_value: raise SpinnmanInvalidParameterException( "payload", payload, "Larger than the maximum allowed of " f"{self._header.eieio_type.max_value}") self.add_element(KeyPayloadDataElement( key, payload, self._header.is_time))
[docs] def add_key(self, key: int) -> None: """ Add a key to the packet. :param int key: The key to add :raise SpinnmanInvalidParameterException: If the key is too big for the format, or the format expects a payload """ if key > self._header.eieio_type.max_value: raise SpinnmanInvalidParameterException( "key", key, "Larger than the maximum allowed of " f"{self._header.eieio_type.max_value}") self.add_element(KeyDataElement(key))
[docs] def add_element(self, element: AbstractDataElement) -> None: """ Add an element to the message. The correct type of element must be added, depending on the header values. :param AbstractDataElement element: The element to be added :raise SpinnmanInvalidParameterException: If the element is not compatible with the header :raise SpinnmanInvalidPacketException: If the message was created to read data """ if self._data is not None: raise SpinnmanInvalidPacketException( "EIEIODataMessage", "This packet is read-only") self._elements += element.get_bytestring(self._header.eieio_type) self._header.increment_count()
@property def is_next_element(self) -> bool: """ Whether there is another element to be read. :rtype: bool """ return (self._data is not None and self._elements_read < self._header.count) @property def next_element(self) -> Optional[AbstractDataElement]: """ The next element to be read, or `None` if no more elements. The exact type of element returned depends on the packet type. :rtype: AbstractDataElement """ if not self.is_next_element: return None self._elements_read += 1 payload: Optional[int] = None assert self._data is not None if self._header.eieio_type == EIEIOType.KEY_16_BIT: key = _ONE_SHORT.unpack_from(self._data, self._offset)[0] self._offset += 2 elif self._header.eieio_type == EIEIOType.KEY_32_BIT: key = _ONE_WORD.unpack_from(self._data, self._offset)[0] self._offset += 4 elif self._header.eieio_type == EIEIOType.KEY_PAYLOAD_16_BIT: key, payload = _TWO_SHORTS.unpack_from(self._data, self._offset) self._offset += 4 elif self._header.eieio_type == EIEIOType.KEY_PAYLOAD_32_BIT: key, payload = _TWO_WORDS.unpack_from(self._data, self._offset) self._offset += 8 else: raise ValueError( f"unknown EIEIO type code: {self._header.eieio_type}") if self._header.prefix is not None: if self._header.prefix_type == EIEIOPrefix.UPPER_HALF_WORD: key |= self._header.prefix << 16 else: key = key | self._header.prefix if self._header.payload_base is not None: if payload is not None: payload |= self._header.payload_base else: payload = self._header.payload_base if payload is None: return KeyDataElement(key) return KeyPayloadDataElement(key, payload, self._header.is_time) @property @overrides(AbstractEIEIOMessage.bytestring) def bytestring(self) -> bytes: return self._header.bytestring + self._elements def __str__(self) -> str: if self._data is not None: return f"EIEIODataMessage:{self._header}:{self._header.count}" return f"EIEIODataMessage:{self._header}:{self._elements!r}" def __repr__(self) -> str: return self.__str__()