Source code for spinnman.extended.write_memory_flood_process

# Copyright (c) 2015 The University of Manchester
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import math
from typing import BinaryIO, Optional
from spinnman.messages.scp.impl import (
    FloodFillEnd, FloodFillStart, FloodFillData)
from spinnman.processes import (
    AbstractMultiConnectionProcess, ConnectionSelector)
from spinnman.constants import UDP_MESSAGE_MAX_SIZE


class WriteMemoryFloodProcess(AbstractMultiConnectionProcess):
    """
    A process for writing memory on multiple SpiNNaker chips at once.
    """
    __slots__ = ()

    def __init__(self, next_connection_selector: ConnectionSelector):
        AbstractMultiConnectionProcess.__init__(
            self, next_connection_selector, n_channels=3,
            intermediate_channel_waits=2)

    def _start_flood_fill(self, n_bytes: int, nearest_neighbour_id: int):
        n_blocks = int(math.ceil(math.ceil(n_bytes / 4.0) /
                                 UDP_MESSAGE_MAX_SIZE))
        with self._collect_responses():
            self._send_request(
                FloodFillStart(nearest_neighbour_id, n_blocks))

    def _end_flood_fill(self, nearest_neighbour_id: int):
        with self._collect_responses():
            self._send_request(FloodFillEnd(nearest_neighbour_id))

[docs] def write_memory_from_bytearray( self, nearest_neighbour_id: int, base_address: int, data: bytes, offset: int, n_bytes: Optional[int] = None): """ :param int nearest_neighbour_id: :param int base_address: :param data: :type data: bytes or bytearray :param int offset: :param int n_bytes: """ # pylint: disable=too-many-arguments if n_bytes is None: n_bytes = len(data) self._start_flood_fill(n_bytes, nearest_neighbour_id) with self._collect_responses(): data_offset = offset offset = base_address block_no = 0 while n_bytes > 0: bytes_to_send = min((int(n_bytes), UDP_MESSAGE_MAX_SIZE)) self._send_request(FloodFillData( nearest_neighbour_id, block_no, offset, data, data_offset, bytes_to_send)) block_no += 1 n_bytes -= bytes_to_send offset += bytes_to_send data_offset += bytes_to_send self._end_flood_fill(nearest_neighbour_id)
[docs] def write_memory_from_reader( self, nearest_neighbour_id: int, base_address: int, reader: BinaryIO, n_bytes: int): """ :param int nearest_neighbour_id: :param int base_address: :param reader: :type reader: ~io.RawIOBase or ~io.BufferedIOBase :param int n_bytes: """ self._start_flood_fill(n_bytes, nearest_neighbour_id) with self._collect_responses(): offset = base_address block_no = 0 while n_bytes > 0: bytes_to_send = min((int(n_bytes), UDP_MESSAGE_MAX_SIZE)) data_array = reader.read(bytes_to_send) self._send_request(FloodFillData( nearest_neighbour_id, block_no, offset, data_array, 0, len(data_array))) block_no += 1 n_bytes -= bytes_to_send offset += bytes_to_send self._end_flood_fill(nearest_neighbour_id)