Source code for spinnman.utilities.io.file_io

# Copyright (c) 2017-2019 The University of Manchester
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

import os
import struct
from spinn_utilities.overrides import overrides
from spinnman.utilities.io.abstract_io import AbstractIO
from spinnman.processes.fill_process import FillDataType


[docs]class FileIO(AbstractIO): """ A file input/output interface to match the MemoryIO interface """ __slots__ = [ # The file to write to "_file", # The current offset in the file "_current_offset", # The start offset in the file "_start_offset", # The end offset in the file "_end_offset" ] def __init__(self, file_obj, start_offset, end_offset): """ :param file_obj: The file handle or file name to write to :type file_obj: str or file :param start_offset: The start offset into the file :type start_offset: int :param end_offset: The end offset from the start of the file :type end_offset: int or None """ self._file = file_obj if isinstance(file_obj, str): self._file = open(file_obj, "wb+") self._current_offset = start_offset self._start_offset = start_offset self._end_offset = end_offset @overrides(AbstractIO.__len__) def __len__(self): return self._end_offset - self._start_offset @overrides(AbstractIO.__getitem__) def __getitem__(self, new_slice): if isinstance(new_slice, int): if new_slice >= len(self): raise ValueError("Index {} out of range".format) return FileIO( self._file, self._start_offset + new_slice, self._start_offset + new_slice + 1) elif isinstance(new_slice, slice): if new_slice.step is not None and new_slice.step != 1: raise ValueError("Slice must be contiguous") start_offset = self._start_offset end_offset = self._end_offset if new_slice.start is not None: if new_slice.start < 0: start_offset = self._end_offset + new_slice.start else: start_offset = self._start_offset + new_slice.start if new_slice.stop is not None: if new_slice.stop > 0: end_offset = self._start_offset + new_slice.stop else: end_offset = self._end_offset + new_slice.stop if (start_offset < self._start_offset or end_offset < self._start_offset or end_offset > self._end_offset or start_offset > self._end_offset): raise ValueError("Slice {} outside of this region".format( new_slice)) if start_offset == end_offset: raise ValueError("Zero sized regions are not supported") return FileIO(self._file, start_offset, end_offset) @overrides(AbstractIO.__enter__) def __enter__(self): return self @overrides(AbstractIO.__exit__) def __exit__(self, exception_type, exception_value, traceback): self.close()
[docs] @overrides(AbstractIO.close) def close(self): self._file.close()
@property @overrides(AbstractIO.closed) def closed(self): return self._file.closed
[docs] @overrides(AbstractIO.flush) def flush(self): self._file.flush()
[docs] @overrides(AbstractIO.seek) def seek(self, n_bytes, from_what=os.SEEK_SET): position = 0 if from_what == os.SEEK_SET: position = self._start_offset + n_bytes elif from_what == os.SEEK_CUR: position = self._current_offset + n_bytes elif from_what == os.SEEK_END: position = self._end_offset + n_bytes else: raise ValueError( "Value of from_what must be one of os.SEEK_SET, os.SEEK_CUR" " or os.SEEK_END") if position < self._start_offset or position > self._end_offset: raise ValueError( "Attempt to seek to a position of {} which is outside of the" " region".format(position)) self._current_offset = position
[docs] @overrides(AbstractIO.tell) def tell(self): return self._current_offset - self._start_offset
@property @overrides(AbstractIO.address) def address(self): return self._current_offset
[docs] @overrides(AbstractIO.read) def read(self, n_bytes=None): if n_bytes == 0: return b"" if n_bytes is None or n_bytes < 0: n_bytes = self._end_offset - self._current_offset if self._current_offset + n_bytes > self._end_offset: raise EOFError self._file.seek(self._current_offset) data = bytes(self._file.read(n_bytes)) self._current_offset += n_bytes return data
[docs] @overrides(AbstractIO.write) def write(self, data): n_bytes = len(data) if self._current_offset + n_bytes > self._end_offset: raise EOFError self._file.seek(self._current_offset) self._file.write(data) self._current_offset += n_bytes return n_bytes
[docs] @overrides(AbstractIO.fill) def fill(self, repeat_value, bytes_to_fill=None, data_type=FillDataType.WORD): if bytes_to_fill is None: bytes_to_fill = self._end_offset - self._current_offset if self._current_offset + bytes_to_fill > self._end_offset: raise EOFError if bytes_to_fill % data_type.value != 0: raise ValueError( "The size of {} bytes to fill is not divisible by the size of" " the data of {} bytes".format(bytes_to_fill, data_type.value)) data_to_fill = struct.pack( str(data_type.struct_type[-1]), repeat_value ) self._file.seek(self._current_offset) for _ in range(bytes_to_fill // data_type.value): self._file.write(data_to_fill) self._current_offset += bytes_to_fill