Source code for spinnman.processes.fill_process

# Copyright (c) 2017-2019 The University of Manchester
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

import logging
import struct
from enum import Enum
from spinnman.messages.scp.impl import FillRequest, WriteMemory
from spinnman.processes.abstract_multi_connection_process import (
    AbstractMultiConnectionProcess)

logger = logging.getLogger(__name__)
ALIGNMENT = 4


[docs]class FillDataType(Enum): WORD = (4, "<I") HALF_WORD = (2, "<2H") BYTE = (1, "<4B") def __new__(cls, value, struct_type, doc=""): # pylint: disable=protected-access, unused-argument obj = object.__new__(cls) obj._value_ = value obj._struct_type = struct_type obj._struct = struct.Struct(struct_type) return obj def __init__(self, value, struct_type, doc=""): self._value_ = value self._struct_type = struct_type self._struct = struct.Struct(struct_type) self.__doc__ = doc @property def struct_type(self): """ The struct descriptor for packing and unpacking 4 bytes-worth of\ this type. :rtype: str """ return self._struct_type @property def struct(self): """ An object that can pack and unpack 4 bytes-worth of this type. :rtype: struct.Struct """ return self._struct
[docs]class FillProcess(AbstractMultiConnectionProcess): """ A process for filling memory. """ __slots__ = [] _PACKS = [struct.Struct("<{}B".format(i)) for i in range(ALIGNMENT)] # pylint: disable=too-many-arguments def _write_pre_bytes(self, x, y, base, data_to_fill, size): extra_bytes = (ALIGNMENT - base % ALIGNMENT) % ALIGNMENT if not extra_bytes: return 0 # Pre_bytes is the first part of the data up to the first aligned # word pre_bytes = self._PACKS[extra_bytes].pack(*data_to_fill[:extra_bytes]) # Send the pre-data to make the memory aligned (or all the # data if the size is correct - note that pre_bytes[:size] will # return all of pre_bytes if size >= len(pre_bytes) pre_data = pre_bytes[:size] if not pre_data: return 0 self._send_request(WriteMemory(x, y, base, pre_data)) return extra_bytes def _write_fill(self, x, y, address, base, data_to_fill, size): extra_bytes = (ALIGNMENT - base % ALIGNMENT) % ALIGNMENT size = size - ALIGNMENT if extra_bytes else size if not size: return 0 # The data to send is the repeated fill data, from the end of the # pre-data circling round to the start of the post-data; we double # it up so that we don't need to use mod (it's pretty small). data = data_to_fill + data_to_fill fill_word = FillDataType.WORD.unpack( data[extra_bytes:extra_bytes + ALIGNMENT - 1])[0] self._send_request(FillRequest(x, y, address, fill_word, size)) return size def _write_post_bytes( self, x, y, address, base, data_to_fill, bytes_to_write): # Post bytes is the last part of the data from the end of the last # aligned word; the number of bytes to write here is exactly the # number of bytes later than a word boundary the initial address is. n_bytes = base % ALIGNMENT if not n_bytes or not bytes_to_write: return post_bytes = self._PACKS[n_bytes].pack(*data_to_fill[-n_bytes:]) self._send_request(WriteMemory( x, y, address, post_bytes[:bytes_to_write]))
[docs] def fill_memory(self, x, y, base_address, data, size, data_type): """ :type x: int :type y: int :type base_address: int :type data: int :type size: int :type data_type: spinnman.processes.fill_process.FillDataType """ # Don't do anything if there is nothing to do! if size == 0: return # Check that the data can fill the requested size if size % data_type.value: raise Exception( "The size of {} bytes to fill is not divisible by the size of" " the data of {} bytes".format(size, data_type.value)) if base_address % ALIGNMENT: logger.warning( "Unaligned fill starting at %d; please use aligned fills", base_address) # Get a word of data regardless of the type data_to_fill = bytearray(data_type.struct.pack( *([data] * (ALIGNMENT / data_type.value)))) written = 0 address = base_address # Send the pre-data to make the memory aligned (or all the # data if the size is correct - note that pre_bytes[:size] will # return all of pre_bytes if size >= len(pre_bytes) delta = self._write_pre_bytes(x, y, base_address, data_to_fill, size) written += delta address += delta # Fill as much as possible delta = self._write_fill( x, y, address, base_address, data_to_fill, size) written += delta address += delta # Post bytes is the last part of the data from the end of the last # aligned word; send them if required self._write_post_bytes( x, y, address, base_address, data_to_fill, size - written) # Wait for all the packets to be confirmed and then check there # are no errors self._finish() self.check_for_error()