Source code for spinnman.processes.read_iobuf_process

import functools
import struct
from collections import defaultdict, OrderedDict

from spinnman.model import IOBuffer
from spinnman.utilities.utility_functions import get_vcpu_address
from spinnman.messages.scp.impl import ReadMemory
from .abstract_multi_connection_process import AbstractMultiConnectionProcess
from spinnman.constants import UDP_MESSAGE_MAX_SIZE, CPU_IOBUF_ADDRESS_OFFSET

_ENCODING = "ascii"


[docs]class ReadIOBufProcess(AbstractMultiConnectionProcess): """ A process for reading memory """ def __init__(self, connection_selector): AbstractMultiConnectionProcess.__init__(self, connection_selector) # A dictionary of (x, y, p) -> iobuf address self._iobuf_address = dict() # A dictionary of (x, y, p) -> OrderedDict(n) -> bytearray self._iobuf = defaultdict(OrderedDict) # A dictionary of (x, y, p) -> OrderedDict(n) -> memoryview self._iobuf_view = defaultdict(OrderedDict) # A list of extra reads that need to be done as a result of the first # read = list of (x, y, p, n, base_address, size, offset) self._extra_reads = list() # A list of next reads that need to be done as a result of the first # read = list of (x, y, p, n, next_address, first_read_size) self._next_reads = list()
[docs] def handle_iobuf_address_response(self, iobuf_size, x, y, p, response): iobuf_address = struct.unpack_from( "<I", response.data, response.offset)[0] if iobuf_address != 0: first_read_size = min((iobuf_size + 16, UDP_MESSAGE_MAX_SIZE)) self._next_reads.append(( x, y, p, 0, iobuf_address, first_read_size))
def _request_iobuf_region(self, region): (x, y, p, n, next_address, first_read_size) = region self._send_request( ReadMemory(x, y, next_address, first_read_size), functools.partial(self.handle_first_iobuf_response, x, y, p, n, next_address, first_read_size))
[docs] def handle_first_iobuf_response(self, x, y, p, n, base_address, first_read_size, response): # Unpack the iobuf header (next_address, bytes_to_read) = struct.unpack_from( "<I8xI", response.data, response.offset) # Create a buffer for the data data = bytearray(bytes_to_read) view = memoryview(data) self._iobuf[(x, y, p)][n] = data self._iobuf_view[(x, y, p)][n] = view # Put the data from this packet into the buffer packet_bytes = response.length - 16 if packet_bytes > bytes_to_read: packet_bytes = bytes_to_read offset = response.offset + 16 if packet_bytes > 0: view[0:packet_bytes] = response.data[ offset:(offset + packet_bytes)] bytes_to_read -= packet_bytes base_address += packet_bytes + 16 read_offset = packet_bytes # While more reads need to be done to read the data while bytes_to_read > 0: # Read the next bit of memory making up the buffer next_bytes_to_read = min((bytes_to_read, UDP_MESSAGE_MAX_SIZE)) self._extra_reads.append((x, y, p, n, base_address, next_bytes_to_read, read_offset)) base_address += next_bytes_to_read read_offset += next_bytes_to_read bytes_to_read -= next_bytes_to_read # If there is another IOBuf buffer, read this next if next_address != 0: self._next_reads.append((x, y, p, n + 1, next_address, first_read_size))
def _request_iobuf_region_tail(self, extra_region): (x, y, p, n, base_address, size, offset) = extra_region self._send_request( ReadMemory(x, y, base_address, size), functools.partial(self.handle_extra_iobuf_response, x, y, p, n, offset))
[docs] def handle_extra_iobuf_response(self, x, y, p, n, offset, response): view = self._iobuf_view[(x, y, p)][n] view[offset:offset + response.length] = response.data[ response.offset:response.offset + response.length]
def _request_iobuf_address(self, iobuf_size, x, y, p): base_address = get_vcpu_address(p) + CPU_IOBUF_ADDRESS_OFFSET self._send_request( ReadMemory(x, y, base_address, 4), functools.partial(self.handle_iobuf_address_response, iobuf_size, x, y, p))
[docs] def read_iobuf(self, iobuf_size, core_subsets): # Get the iobuf address for each core for core_subset in core_subsets: x = core_subset.x y = core_subset.y for p in core_subset.processor_ids: self._request_iobuf_address(iobuf_size, x, y, p) self._finish() self.check_for_error() # Run rounds of the process until reading is complete while len(self._extra_reads) > 0 or len(self._next_reads) > 0: # Process the extra iobuf reads needed while self._extra_reads: self._request_iobuf_region_tail(self._extra_reads.pop()) # Process the next iobuf reads needed while self._next_reads: self._request_iobuf_region(self._next_reads.pop()) # Finish this round self._finish() self.check_for_error() for core_subset in core_subsets: x = core_subset.x y = core_subset.y for p in core_subset.processor_ids: iobufs = self._iobuf[(x, y, p)] iobuf = "" for item in iobufs.itervalues(): iobuf += item.decode(_ENCODING) yield IOBuffer(x, y, p, iobuf)