Source code for spinnman.utilities.io.memory_io

# Copyright (c) 2017-2019 The University of Manchester
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

import os
import struct
from spinn_utilities.overrides import overrides
from spinnman.utilities.io.abstract_io import AbstractIO
from spinnman.processes.fill_process import FillDataType

# A set of ChipMemoryIO objects that have been created,
# indexed by transceiver, x and y (thus two transceivers might not see the
# same buffered memory)
_chip_memory_io_objects = dict()

# Start of SDRAM, using *unbuffered* memory access protocol.
UNBUFFERED_SDRAM_START = 0x60000000


def _get_chip_memory_io(transceiver, x, y):
    if (transceiver, x, y) not in _chip_memory_io_objects:
        _chip_memory_io_objects[transceiver, x, y] = _ChipMemoryIO(
            transceiver, x, y)
    return _chip_memory_io_objects[transceiver, x, y]


class _ChipMemoryIO(object):
    """ A file-like object for the memory of a chip
    """

    __slots__ = [

        # The transceiver for speaking to the machine
        "_transceiver",

        # The x coordinate of the chip to communicate with
        "_x",

        # The y coordinate of the chip to communicate with
        "_y",

        # The current pointer where read and writes are taking place
        "_current_address",

        # The current pointer where the next buffered write will occur
        "_write_address",

        # The write buffer size
        "_buffer_size",

        # The write buffer bytearray
        "_write_buffer",

        # The write buffer memory view
        "_write_memory_view",

        # The write buffer pointer
        "_write_buffer_offset"
    ]

    def __init__(
            self, transceiver, x, y, base_address=UNBUFFERED_SDRAM_START,
            buffer_size=256):
        """
        :param transceiver: The transceiver to read and write with
        :param x: The x-coordinate of the chip to write to
        :param y: The y-coordinate of the chip to write to
        :param base_address: The lowest address that can be written
        :param buffer_size: The size of the write buffer to improve efficiency
        """
        # pylint: disable=too-many-arguments
        self._transceiver = transceiver
        self._x = x
        self._y = y
        self._current_address = base_address
        self._write_address = base_address
        self._buffer_size = buffer_size
        self._write_buffer = bytearray(self._buffer_size)
        self._write_memory_view = memoryview(self._write_buffer)
        self._write_buffer_offset = 0

    @property
    def transceiver(self):
        return self._transceiver

    @property
    def x(self):
        return self._x

    @property
    def y(self):
        return self._y

    def flush_write_buffer(self):
        """ Force the writing of the current write buffer
        """
        if self._write_buffer_offset > 0:
            self._transceiver.write_memory(
                self._x, self._y, self._write_address, self._write_buffer,
                n_bytes=self._write_buffer_offset)
            self._write_address += self._write_buffer_offset
            self._write_buffer_offset = 0

    @property
    def current_address(self):
        """ Return the current absolute address within the region
        """
        return self._current_address

    @current_address.setter
    def current_address(self, address):
        """ Seek to a position within the region
        """
        self.flush_write_buffer()
        self._current_address = address
        self._write_address = address

    def read(self, n_bytes):
        """ Read a number of bytes

        :param n_bytes: The number of bytes to read
        :rtype: bytes
        """
        if n_bytes == 0:
            return b""

        self.flush_write_buffer()
        data = self._transceiver.read_memory(
            self._x, self._y, self._current_address, n_bytes)
        self._current_address += n_bytes
        self._write_address = self._current_address

        return data

    def write(self, data):
        """ Write some data

        :param data: The data to write
        :type data: bytes
        """
        n_bytes = len(data)

        if n_bytes >= self._buffer_size:
            self.flush_write_buffer()
            self._transceiver.write_memory(
                self._x, self._y, self._current_address, data)
            self._current_address += n_bytes
            self._write_address = self._current_address

        else:
            n_bytes_to_copy = min(
                n_bytes, self._buffer_size - self._write_buffer_offset)
            self._write_memory_view[
                self._write_buffer_offset:
                self._write_buffer_offset + n_bytes_to_copy
            ] = data[:n_bytes_to_copy]
            self._write_buffer_offset += n_bytes_to_copy
            self._current_address += n_bytes_to_copy
            n_bytes -= n_bytes_to_copy
            if self._write_buffer_offset == self._buffer_size:
                self.flush_write_buffer()
            if n_bytes > 0:
                self._write_memory_view[:n_bytes] = data[n_bytes_to_copy:]
                self._write_buffer_offset += n_bytes
                self._current_address += n_bytes

    def fill(self, repeat_value, bytes_to_fill, data_type=FillDataType.WORD):
        """ Fill the memory with repeated data

        :param repeat_value: The value to repeat
        :type repeat_value: int
        :param bytes_to_fill: Number of bytes to fill from current position
        :type bytes_to_fill: int
        :param data_type: The type of the repeat value
        :type data_type: \
            :py:class:`spinnman.processes.fill_process.FillDataType`
        """
        self.flush_write_buffer()
        self._transceiver.fill_memory(
            self._x, self._y, self._current_address, repeat_value,
            bytes_to_fill, data_type)
        self._current_address += bytes_to_fill


[docs]class MemoryIO(AbstractIO): """ A file-like object for reading and writing memory """ __slots__ = [ # The transceiver for speaking to the machine "_chip_memory_io", # The start address of the region to write to "_start_address", # The current pointer where read and writes are taking place "_current_address", # The end of the region to write to "_end_address" ] # Cache of all writes that are currently buffered in any instance, to # ensure they are written before a read occurs __write_cache__ = dict() def __init__(self, transceiver, x, y, start_address, end_address): """ :param transceiver: The transceiver to read and write with :param x: The x-coordinate of the chip to write to :param y: The y-coordinate of the chip to write to :param start_address: The start address of the region to write to :param end_address:\ The end address of the region to write to. This is the first\ address just outside the region """ # pylint: disable=too-many-arguments if start_address >= end_address: raise ValueError("Start address must be less than end address") self._chip_memory_io = _get_chip_memory_io(transceiver, x, y) self._start_address = start_address self._current_address = start_address self._end_address = end_address @overrides(AbstractIO.__len__) def __len__(self): return self._end_address - self._start_address @overrides(AbstractIO.__getitem__) def __getitem__(self, new_slice): if isinstance(new_slice, int): if new_slice >= len(self): raise ValueError("Index {} out of range".format) return MemoryIO( self._chip_memory_io.transceiver, self._chip_memory_io.x, self._chip_memory_io.y, self._start_address + new_slice, self._start_address + new_slice + 1) elif isinstance(new_slice, slice): if new_slice.step is not None and new_slice.step != 1: raise ValueError("Slice must be contiguous") start_address = self._start_address end_address = self._end_address if new_slice.start is not None: if new_slice.start < 0: start_address = self._end_address + new_slice.start else: start_address = self._start_address + new_slice.start if new_slice.stop is not None: if new_slice.stop > 0: end_address = self._start_address + new_slice.stop else: end_address = self._end_address + new_slice.stop if (start_address < self._start_address or end_address < self._start_address or end_address > self._end_address or start_address > self._end_address): raise ValueError("Slice {} outside of this region".format( new_slice)) if start_address == end_address: raise ValueError("Zero sized regions are not supported") return MemoryIO( self._chip_memory_io.transceiver, self._chip_memory_io.x, self._chip_memory_io.y, start_address, end_address) @overrides(AbstractIO.__enter__) def __enter__(self): return self @overrides(AbstractIO.__exit__) def __exit__(self, exception_type, exception_value, traceback): self.close() return False
[docs] @overrides(AbstractIO.close) def close(self): self._chip_memory_io.flush_write_buffer()
@property @overrides(AbstractIO.closed) def closed(self): return False
[docs] @overrides(AbstractIO.flush) def flush(self): self._chip_memory_io.flush_write_buffer()
[docs] @overrides(AbstractIO.seek) def seek(self, n_bytes, from_what=os.SEEK_SET): """ Seek to a position within the region """ position = 0 if from_what == os.SEEK_SET: position = self._start_address + n_bytes elif from_what == os.SEEK_CUR: position = self._current_address + n_bytes elif from_what == os.SEEK_END: position = self._end_address + n_bytes else: raise ValueError( "Value of from_what must be one of os.SEEK_SET, os.SEEK_CUR" " or os.SEEK_END") if position < self._start_address or position > self._end_address: raise ValueError( "Attempt to seek to a position of {} which is outside of the" " region".format(position)) self._current_address = position
[docs] @overrides(AbstractIO.tell) def tell(self): return self._current_address - self._start_address
@property def address(self): """ Return the current absolute address within the region """ return self._current_address
[docs] @overrides(AbstractIO.read) def read(self, n_bytes=None): bytes_to_read = n_bytes if n_bytes is None or n_bytes < 0: bytes_to_read = self._end_address - self._current_address if self._current_address + bytes_to_read > self._end_address: raise EOFError self._chip_memory_io.current_address = self._current_address data = bytes(self._chip_memory_io.read(bytes_to_read)) self._current_address += bytes_to_read return data
[docs] @overrides(AbstractIO.write) def write(self, data): n_bytes = len(data) if self._current_address + n_bytes > self._end_address: raise EOFError self._chip_memory_io.current_address = self._current_address self._chip_memory_io.write(data) self._current_address += n_bytes return n_bytes
[docs] @overrides(AbstractIO.fill) def fill(self, repeat_value, bytes_to_fill=None, data_type=FillDataType.WORD): if bytes_to_fill is None: bytes_to_fill = self._end_address - self._current_address if self._current_address + bytes_to_fill > self._end_address: raise EOFError if bytes_to_fill % data_type.value != 0: raise ValueError( "The size of {} bytes to fill is not divisible by the size of" " the data of {} bytes".format(bytes_to_fill, data_type.value)) data_to_fill = struct.pack( "{}".format(data_type.struct_type[-1]), repeat_value) self._chip_memory_io.current_address = self._current_address for _ in range(bytes_to_fill // data_type.value): self._chip_memory_io.write(data_to_fill) self._current_address += bytes_to_fill