Source code for spinnman.processes.write_memory_flood_process

from spinnman.messages.scp.impl import \
    FloodFillEnd, FloodFillStart, FloodFillData
from .abstract_multi_connection_process import AbstractMultiConnectionProcess
from spinnman.constants import UDP_MESSAGE_MAX_SIZE

import math


[docs]class WriteMemoryFloodProcess(AbstractMultiConnectionProcess): """ A process for writing memory """ def __init__(self, connection_selector): AbstractMultiConnectionProcess.__init__(self, connection_selector) def _start_flood_fill(self, n_bytes, nearest_neighbour_id): n_blocks = int(math.ceil(math.ceil(n_bytes / 4.0) / UDP_MESSAGE_MAX_SIZE)) self._send_request( FloodFillStart(nearest_neighbour_id, n_blocks)) self._finish() self.check_for_error() def _end_flood_fill(self, nearest_neighbour_id): self._send_request(FloodFillEnd(nearest_neighbour_id)) self._finish() self.check_for_error()
[docs] def write_memory_from_bytearray(self, nearest_neighbour_id, base_address, data, offset, n_bytes): self._start_flood_fill(n_bytes, nearest_neighbour_id) data_offset = offset offset = base_address block_no = 0 while n_bytes > 0: bytes_to_send = int(n_bytes) if bytes_to_send > UDP_MESSAGE_MAX_SIZE: bytes_to_send = UDP_MESSAGE_MAX_SIZE self._send_request(FloodFillData( nearest_neighbour_id, block_no, offset, data, data_offset, bytes_to_send)) block_no += 1 n_bytes -= bytes_to_send offset += bytes_to_send data_offset += bytes_to_send self._finish() self.check_for_error() self._end_flood_fill(nearest_neighbour_id)
[docs] def write_memory_from_reader(self, nearest_neighbour_id, base_address, data, n_bytes): self._start_flood_fill(n_bytes, nearest_neighbour_id) offset = base_address block_no = 0 while n_bytes > 0: bytes_to_send = int(n_bytes) if bytes_to_send > UDP_MESSAGE_MAX_SIZE: bytes_to_send = UDP_MESSAGE_MAX_SIZE data_array = data.read(bytes_to_send) self._send_request(FloodFillData( nearest_neighbour_id, block_no, offset, data_array, 0, len(data_array))) block_no += 1 n_bytes -= bytes_to_send offset += bytes_to_send self._finish() self.check_for_error() self._end_flood_fill(nearest_neighbour_id)