Source code for spinnman.processes.read_iobuf_process

# Copyright (c) 2015 The University of Manchester
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from dataclasses import dataclass
import functools
import struct
from collections import defaultdict
from typing import Dict, Iterable, List
from spinn_utilities.typing.coords import XYP
from spinn_machine import CoreSubsets
from spinnman.model import IOBuffer
from spinnman.utilities.utility_functions import get_vcpu_address
from spinnman.messages.scp.impl.read_memory import ReadMemory, Response
from spinnman.constants import UDP_MESSAGE_MAX_SIZE, CPU_IOBUF_ADDRESS_OFFSET
from .abstract_multi_connection_process import AbstractMultiConnectionProcess
from .abstract_multi_connection_process_connection_selector import (
    ConnectionSelector)


@dataclass(frozen=True)
class _RegionTail:
    scamp_coords: XYP
    core_coords: XYP
    n: int
    base_address: int
    size: int
    offset: int


@dataclass(frozen=True)
class _NextRegion:
    scamp_coords: XYP
    core_coords: XYP
    n: int
    next_address: int
    first_read_size: int

    def next_at(self, address: int) -> '_NextRegion':
        """

        :param int address:
        :rtype: _NextRegion
        """
        return _NextRegion(
            self.scamp_coords, self.core_coords, self.n + 1, address,
            self.first_read_size)

    def tail(self, address: int, size: int, offset: int) -> _RegionTail:
        """

        :param int address:
        :param int size:
        :param int offset:
        :rtype: _RegionTail
        """
        return _RegionTail(
            self.scamp_coords, self.core_coords, self.n, address, size, offset)


_ENCODING = "ascii"
_ONE_WORD = struct.Struct("<I")
_FIRST_IOBUF = struct.Struct("<I8xI")


class ReadIOBufProcess(AbstractMultiConnectionProcess[Response]):
    """
    A process for reading IOBUF memory (mostly log messages) from a
    SpiNNaker core.
    """
    __slots__ = (
        "_extra_reads",
        "_iobuf",
        "_iobuf_address",
        "_iobuf_view",
        "_next_reads")

    def __init__(self, connection_selector: ConnectionSelector) -> None:
        """
        :param ConnectionSelector connection_selector:
        """
        super().__init__(connection_selector)

        # A dictionary of (x, y, p) -> iobuf address
        self._iobuf_address: Dict[XYP, int] = dict()

        # A dictionary of (x, y, p) -> OrderedDict(n) -> bytearray
        self._iobuf: Dict[XYP, Dict[int, bytes]] = defaultdict(dict)

        # A dictionary of (x, y, p) -> OrderedDict(n) -> memoryview
        self._iobuf_view: Dict[XYP, Dict[int, memoryview]] = defaultdict(dict)

        # A list of extra reads that need to be done as a result of the first
        # read = list of (x, y, p, n, base_address, size, offset)
        self._extra_reads: List[_RegionTail] = list()

        # A list of next reads that need to be done as a result of the first
        # read = list of (x, y, p, n, next_address, first_read_size)
        self._next_reads: List[_NextRegion] = list()

    def _request_iobuf_address(self, iobuf_size: int, x: int, y: int, p: int):
        scamp_coords = (x, y, 0)
        base_address = get_vcpu_address(p) + CPU_IOBUF_ADDRESS_OFFSET
        self._send_request(
            ReadMemory(scamp_coords, base_address, 4),
            functools.partial(self.__handle_iobuf_address_response,
                              iobuf_size, scamp_coords, (x, y, p)))

    def __handle_iobuf_address_response(
            self, iobuf_size: int, scamp_coords: XYP, xyp: XYP,
            response: Response):
        iobuf_address, = _ONE_WORD.unpack_from(response.data, response.offset)
        if iobuf_address != 0:
            first_read_size = min((iobuf_size + 16, UDP_MESSAGE_MAX_SIZE))
            self._next_reads.append(_NextRegion(
                scamp_coords, xyp, 0, iobuf_address, first_read_size))

    def _request_iobuf_region_tail(self, tail: _RegionTail):
        self._send_request(
            ReadMemory(tail.scamp_coords, tail.base_address, tail.size),
            functools.partial(
                self.__handle_extra_iobuf_response, tail))

    def __handle_extra_iobuf_response(
            self, tail: _RegionTail, response: Response):
        view = self._iobuf_view[tail.core_coords][tail.n]
        base = tail.offset
        view[base:base + response.length] = response.data[
            response.offset:response.offset + response.length]

    def _request_iobuf_region(self, region: _NextRegion):
        self._send_request(
            ReadMemory(region.scamp_coords, region.next_address,
                       region.first_read_size),
            functools.partial(self.__handle_first_iobuf_response, region))

    def __handle_first_iobuf_response(
            self, region: _NextRegion, response: Response):
        base_address = region.next_address

        # Unpack the iobuf header
        (next_address, bytes_to_read) = _FIRST_IOBUF.unpack_from(
            response.data, response.offset)

        # Create a buffer for the data
        data = bytearray(bytes_to_read)
        view = memoryview(data)
        self._iobuf[region.core_coords][region.n] = data
        self._iobuf_view[region.core_coords][region.n] = view

        # Put the data from this packet into the buffer
        packet_bytes = response.length - 16
        if packet_bytes > bytes_to_read:
            packet_bytes = bytes_to_read
        if packet_bytes > 0:
            offset = response.offset + 16
            view[0:packet_bytes] = response.data[offset:offset + packet_bytes]

        bytes_to_read -= packet_bytes
        base_address += packet_bytes + 16
        read_offset = packet_bytes

        # While more reads need to be done to read the data
        while bytes_to_read > 0:
            # Read the next bit of memory making up the buffer
            next_bytes_to_read = min((bytes_to_read, UDP_MESSAGE_MAX_SIZE))
            self._extra_reads.append(region.tail(
                base_address, next_bytes_to_read, read_offset))
            base_address += next_bytes_to_read
            read_offset += next_bytes_to_read
            bytes_to_read -= next_bytes_to_read

        # If there is another IOBuf buffer, read this next
        if next_address != 0:
            self._next_reads.append(region.next_at(next_address))

[docs] def read_iobuf( self, iobuf_size: int, core_subsets: CoreSubsets) -> Iterable[IOBuffer]: """ :param int iobuf_size: :param ~spinn_machine.CoreSubsets core_subsets: :rtype: iterable(IOBuffer) """ # Get the iobuf address for each core with self._collect_responses(): for core_subset in core_subsets: x, y = core_subset.x, core_subset.y for p in core_subset.processor_ids: self._request_iobuf_address(iobuf_size, x, y, p) # Run rounds of the process until reading is complete while self._extra_reads or self._next_reads: with self._collect_responses(): # Process the extra iobuf reads needed while self._extra_reads: self._request_iobuf_region_tail(self._extra_reads.pop()) # Process the next iobuf reads needed while self._next_reads: self._request_iobuf_region(self._next_reads.pop()) for core_subset in core_subsets: x, y = core_subset.x, core_subset.y for p in core_subset.processor_ids: iobuf = "" for item in self._iobuf[x, y, p].values(): iobuf += item.decode(_ENCODING) yield IOBuffer(x, y, p, iobuf)