# Copyright (c) 2015 The University of Manchester
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from dataclasses import dataclass
import functools
import struct
from collections import defaultdict
from typing import Dict, Iterable, List
from spinn_utilities.typing.coords import XYP
from spinn_machine import CoreSubsets
from spinnman.model import IOBuffer
from spinnman.utilities.utility_functions import get_vcpu_address
from spinnman.messages.scp.impl.read_memory import ReadMemory, Response
from spinnman.constants import UDP_MESSAGE_MAX_SIZE, CPU_IOBUF_ADDRESS_OFFSET
from .abstract_multi_connection_process import AbstractMultiConnectionProcess
from .abstract_multi_connection_process_connection_selector import (
ConnectionSelector)
@dataclass(frozen=True)
class _RegionTail:
scamp_coords: XYP
core_coords: XYP
n: int
base_address: int
size: int
offset: int
@dataclass(frozen=True)
class _NextRegion:
scamp_coords: XYP
core_coords: XYP
n: int
next_address: int
first_read_size: int
def next_at(self, address: int) -> '_NextRegion':
"""
:param int address:
:rtype: _NextRegion
"""
return _NextRegion(
self.scamp_coords, self.core_coords, self.n + 1, address,
self.first_read_size)
def tail(self, address: int, size: int, offset: int) -> _RegionTail:
"""
:param int address:
:param int size:
:param int offset:
:rtype: _RegionTail
"""
return _RegionTail(
self.scamp_coords, self.core_coords, self.n, address, size, offset)
_ENCODING = "ascii"
_ONE_WORD = struct.Struct("<I")
_FIRST_IOBUF = struct.Struct("<I8xI")
class ReadIOBufProcess(AbstractMultiConnectionProcess[Response]):
"""
A process for reading IOBUF memory (mostly log messages) from a
SpiNNaker core.
"""
__slots__ = (
"_extra_reads",
"_iobuf",
"_iobuf_address",
"_iobuf_view",
"_next_reads")
def __init__(self, connection_selector: ConnectionSelector) -> None:
"""
:param ConnectionSelector connection_selector:
"""
super().__init__(connection_selector)
# A dictionary of (x, y, p) -> iobuf address
self._iobuf_address: Dict[XYP, int] = dict()
# A dictionary of (x, y, p) -> OrderedDict(n) -> bytearray
self._iobuf: Dict[XYP, Dict[int, bytes]] = defaultdict(dict)
# A dictionary of (x, y, p) -> OrderedDict(n) -> memoryview
self._iobuf_view: Dict[XYP, Dict[int, memoryview]] = defaultdict(dict)
# A list of extra reads that need to be done as a result of the first
# read = list of (x, y, p, n, base_address, size, offset)
self._extra_reads: List[_RegionTail] = list()
# A list of next reads that need to be done as a result of the first
# read = list of (x, y, p, n, next_address, first_read_size)
self._next_reads: List[_NextRegion] = list()
def _request_iobuf_address(self, iobuf_size: int, x: int, y: int, p: int):
scamp_coords = (x, y, 0)
base_address = get_vcpu_address(p) + CPU_IOBUF_ADDRESS_OFFSET
self._send_request(
ReadMemory(scamp_coords, base_address, 4),
functools.partial(self.__handle_iobuf_address_response,
iobuf_size, scamp_coords, (x, y, p)))
def __handle_iobuf_address_response(
self, iobuf_size: int, scamp_coords: XYP, xyp: XYP,
response: Response):
iobuf_address, = _ONE_WORD.unpack_from(response.data, response.offset)
if iobuf_address != 0:
first_read_size = min((iobuf_size + 16, UDP_MESSAGE_MAX_SIZE))
self._next_reads.append(_NextRegion(
scamp_coords, xyp, 0, iobuf_address, first_read_size))
def _request_iobuf_region_tail(self, tail: _RegionTail):
self._send_request(
ReadMemory(tail.scamp_coords, tail.base_address, tail.size),
functools.partial(
self.__handle_extra_iobuf_response, tail))
def __handle_extra_iobuf_response(
self, tail: _RegionTail, response: Response):
view = self._iobuf_view[tail.core_coords][tail.n]
base = tail.offset
view[base:base + response.length] = response.data[
response.offset:response.offset + response.length]
def _request_iobuf_region(self, region: _NextRegion):
self._send_request(
ReadMemory(region.scamp_coords, region.next_address,
region.first_read_size),
functools.partial(self.__handle_first_iobuf_response, region))
def __handle_first_iobuf_response(
self, region: _NextRegion, response: Response):
base_address = region.next_address
# Unpack the iobuf header
(next_address, bytes_to_read) = _FIRST_IOBUF.unpack_from(
response.data, response.offset)
# Create a buffer for the data
data = bytearray(bytes_to_read)
view = memoryview(data)
self._iobuf[region.core_coords][region.n] = data
self._iobuf_view[region.core_coords][region.n] = view
# Put the data from this packet into the buffer
packet_bytes = response.length - 16
if packet_bytes > bytes_to_read:
packet_bytes = bytes_to_read
if packet_bytes > 0:
offset = response.offset + 16
view[0:packet_bytes] = response.data[offset:offset + packet_bytes]
bytes_to_read -= packet_bytes
base_address += packet_bytes + 16
read_offset = packet_bytes
# While more reads need to be done to read the data
while bytes_to_read > 0:
# Read the next bit of memory making up the buffer
next_bytes_to_read = min((bytes_to_read, UDP_MESSAGE_MAX_SIZE))
self._extra_reads.append(region.tail(
base_address, next_bytes_to_read, read_offset))
base_address += next_bytes_to_read
read_offset += next_bytes_to_read
bytes_to_read -= next_bytes_to_read
# If there is another IOBuf buffer, read this next
if next_address != 0:
self._next_reads.append(region.next_at(next_address))
[docs]
def read_iobuf(
self, iobuf_size: int,
core_subsets: CoreSubsets) -> Iterable[IOBuffer]:
"""
:param int iobuf_size:
:param ~spinn_machine.CoreSubsets core_subsets:
:rtype: iterable(IOBuffer)
"""
# Get the iobuf address for each core
with self._collect_responses():
for core_subset in core_subsets:
x, y = core_subset.x, core_subset.y
for p in core_subset.processor_ids:
self._request_iobuf_address(iobuf_size, x, y, p)
# Run rounds of the process until reading is complete
while self._extra_reads or self._next_reads:
with self._collect_responses():
# Process the extra iobuf reads needed
while self._extra_reads:
self._request_iobuf_region_tail(self._extra_reads.pop())
# Process the next iobuf reads needed
while self._next_reads:
self._request_iobuf_region(self._next_reads.pop())
for core_subset in core_subsets:
x, y = core_subset.x, core_subset.y
for p in core_subset.processor_ids:
iobuf = ""
for item in self._iobuf[x, y, p].values():
iobuf += item.decode(_ENCODING)
yield IOBuffer(x, y, p, iobuf)