Source code for spinnman.messages.eieio.data_messages.eieio_data_header
# Copyright (c) 2015 The University of Manchester
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import struct
from typing import Optional
from spinnman.messages.eieio import EIEIOType, EIEIOPrefix
from spinnman.exceptions import SpinnmanInvalidPacketException
_PATTERN_BBHH = struct.Struct("<BBHH")
_PATTERN_BBH = struct.Struct("<BBH")
_PATTERN_BBHI = struct.Struct("<BBHI")
_PATTERN_BBI = struct.Struct("<BBI")
_PATTERN_BB = struct.Struct("<BB")
_PATTERN_HH = struct.Struct("<HH")
_PATTERN_H = struct.Struct("<H")
_PATTERN_HI = struct.Struct("<HI")
_PATTERN_I = struct.Struct("<I")
class EIEIODataHeader(object):
"""
The header part of EIEIO data.
"""
__slots__ = (
"_count",
"_eieio_type",
"_is_time",
"_payload_base",
"_prefix",
"_prefix_type",
"_tag")
def __init__(self, eieio_type: EIEIOType, tag: int = 0,
prefix: Optional[int] = None,
prefix_type: EIEIOPrefix = EIEIOPrefix.LOWER_HALF_WORD,
payload_base: Optional[int] = None, is_time: bool = False,
count: int = 0):
"""
EIEIO header for data packets.
:param EIEIOType eieio_type: the type of message
:param int tag: the tag of the message (0 by default)
:param prefix: the key prefix of the message or `None` if not prefixed
:type prefix: int or None
:param EIEIOPrefix prefix_type:
the position of the prefix (upper or lower)
:param payload_base:
The base payload to be applied, or `None` if no base payload
:type payload_base: int or None
:param bool is_time:
True if the payloads should be taken to be timestamps, or False
otherwise
:param int count: Count of the number of items in the packet
"""
# pylint: disable=too-many-arguments
self._eieio_type = eieio_type
self._tag = tag
self._prefix = prefix
self._prefix_type = prefix_type
self._payload_base = payload_base
self._is_time = is_time
self._count = count
@property
def eieio_type(self) -> EIEIOType:
"""
Gets the eieio_type passed into the init.
:rtype: EIEIOType
"""
return self._eieio_type
@property
def tag(self) -> int:
"""
Gets the tag value passed into the init.
:rtype: int
"""
return self._tag
@property
def prefix(self) -> Optional[int]:
"""
Gets prefix passed into the init (if applicable).
:rtype: int or None
"""
return self._prefix
@property
def prefix_type(self) -> EIEIOPrefix:
"""
Gets the prefix_type passed into the init.
:rtype: EIEIOPrefix
"""
return self._prefix_type
@property
def payload_base(self) -> Optional[int]:
"""
Gets the payload_base value passed into the init (if applicable).
:rtype: int or None
"""
return self._payload_base
@property
def is_time(self) -> bool:
"""
Gets the is_time value passed into the init.
:rtype: bool
"""
return self._is_time
@property
def count(self) -> int:
"""
Count of the number of items in the packet
:rtype: int
"""
return self._count
@count.setter
def count(self, count: int) -> None:
"""
Sets the Count of the number of items in the packet
:param int count: the new value
"""
self._count = count
@property
def size(self) -> int:
"""
Get the size of a header with the given parameters.
:rtype: int
"""
return EIEIODataHeader.get_header_size(
self._eieio_type, self._prefix is not None,
self._payload_base is not None)
@property
def bytestring(self) -> bytes:
"""
The byte-string of the header.
:rtype: bytes
"""
# Convert the flags to an int
data = 0
# the flag for prefix or not
if self._prefix is not None:
data |= 1 << 7
# the prefix type
data |= self._prefix_type.value << 6
# the flag for payload prefix
if self._payload_base is not None:
data |= 1 << 5
# the flag for time in payloads
if self._is_time:
data |= 1 << 4
# The type of the packet
data |= self._eieio_type.value << 2
# The tag of the packet
data |= self._tag
# Convert the remaining data, depending on the various options
if self._payload_base is None:
if self._prefix is not None:
return _PATTERN_BBH.pack(self._count, data, self._prefix)
return _PATTERN_BB.pack(self._count, data)
if (self._eieio_type == EIEIOType.KEY_PAYLOAD_16_BIT or
self._eieio_type == EIEIOType.KEY_16_BIT):
if self._prefix is not None:
return _PATTERN_BBHH.pack(
self._count, data, self._prefix, self._payload_base)
return _PATTERN_BBH.pack(self._count, data, self._payload_base)
if (self._eieio_type == EIEIOType.KEY_PAYLOAD_32_BIT or
self._eieio_type == EIEIOType.KEY_32_BIT):
if self._prefix is not None:
return _PATTERN_BBHI.pack(
self._count, data, self._prefix, self._payload_base)
return _PATTERN_BBI.pack(self._count, data, self._payload_base)
raise SpinnmanInvalidPacketException(
"EIEIODataMessage", "unexpected EIEIO type code")
def __str__(self) -> str:
return (f"EIEIODataHeader:prefix={self._prefix}:"
f"prefix_type={self._prefix_type}:"
f"payload_base={self._payload_base}:"
"is_time={self._is_time}:type={self._eieio_type.value}:"
"tag={self._tag}:count={self._count}")
def __repr__(self) -> str:
return self.__str__()