Source code for spinnman.connections.scp_request_pipeline

import sys
import time

from spinnman.messages.scp.enums import SCPResult
from spinnman.exceptions import SpinnmanTimeoutException

MAX_SEQUENCE = 65536

# Keep a global track of the sequence numbers used
_next_sequence = 0


[docs]class SCPRequestPipeLine(object): """ Allows a set of SCP requests to be grouped together in a communication\ across a number of channels for a given connection. This class implements an SCP windowing, first suggested by Andrew\ Mundy. This extends the idea by having both send and receive windows.\ These are represented by the n_channels and the\ intermediate_channel_waits parameters respectively. This seems to\ help with the timeout issue; when a timeout is received, all requests\ for which a reply has not been received can also timeout. """ def __init__(self, connection, n_channels=1, intermediate_channel_waits=0, retry_codes=set([SCPResult.RC_TIMEOUT, SCPResult.RC_P2P_TIMEOUT, SCPResult.RC_LEN, SCPResult.RC_P2P_NOREPLY]), n_retries=3, packet_timeout=0.5): """ :param connection: The connection over which the communication is to\ take place :param n_channels: The number of requests to send before checking for\ responses. If None, this will be determined automatically :param intermediate_channel_waits: The number of outstanding responses\ to wait for before continuing sending requests. If None,\ this will be determined automatically :param retry_codes: The set of response codes that will be retried :param n_retries: The number of times to resend any packet for any\ reason before an error is triggered :param packet_timeout: The number of elapsed seconds after sending a\ packet before it is considered a timeout. """ self._connection = connection self._n_channels = n_channels self._intermediate_channel_waits = intermediate_channel_waits self._retry_codes = retry_codes self._n_retries = n_retries self._packet_timeout = packet_timeout if (self._n_channels is not None and self._intermediate_channel_waits is None): self._intermediate_channel_waits = self._n_channels - 8 if self._intermediate_channel_waits < 0: self._intermediate_channel_waits = 0 # A dictionary of sequence number -> requests in progress self._requests = dict() self._request_data = dict() # A dictionary of sequence number -> time at which sequence was sent self._times_sent = dict() # A dictionary of sequence number -> number of retries for the packet self._retries = dict() # A dictionary of sequence number -> callback function for response self._callbacks = dict() # A dictionary of sequence number -> callback function for errors self._error_callbacks = dict() self._send_time = dict() # The number of responses outstanding self._in_progress = 0 # The number of timeouts that occured self._n_timeouts = 0 # The number of packets that have been resent self._n_resent = 0 self._n_retry_code_resent = 0 # self._token_bucket = TokenBucket(43750, 4375000) # self._token_bucket = TokenBucket(3408, 700000)
[docs] def send_request(self, request, callback, error_callback): """ Add an SCP request to the set to be sent :param request: The SCP request to be sent :param callback: A callback function to call when the response has\ been received; takes SCPResponse as a parameter, or None\ if the response doesn't need to be processed :param error_callback: A callback function to call when an error is found when processing the message; takes original\ SCPRequest, exception caught and a list of tuples of\ (filename, line number, function name, text) as a traceback """ # Update the packet and store required details global _next_sequence request.scp_request_header.sequence = _next_sequence request_data = self._connection.get_scp_data(request) self._requests[_next_sequence] = request self._request_data[_next_sequence] = request_data self._retries[_next_sequence] = self._n_retries self._callbacks[_next_sequence] = callback self._error_callbacks[_next_sequence] = error_callback self._send_time[_next_sequence] = time.time() _next_sequence = (_next_sequence + 1) % MAX_SEQUENCE # If the connection has not been measured if self._n_channels is None: if self._connection.is_ready_to_receive(): self._n_channels = self._in_progress + 8 if self._n_channels < 12: self._n_channels = 12 self._intermediate_channel_waits = self._n_channels - 8 # If all the channels are used, start to receive packets while (self._n_channels is not None and self._in_progress >= self._n_channels): self._do_retrieve(self._intermediate_channel_waits, 0.1) # Send the request, keeping track of how many are sent # self._token_bucket.consume(284) self._connection.send(request_data) self._in_progress += 1
[docs] def finish(self): """ Indicate the end of the packets to be sent. This must be called\ to ensure that all responses are received and handled. """ while self._in_progress > 0: self._do_retrieve(0, self._packet_timeout)
@property def n_timeouts(self): return self._n_timeouts @property def n_channels(self): return self._n_channels @property def n_resent(self): return self._n_resent @property def n_retry_code_resent(self): return self._n_retry_code_resent def _remove_record(self, seq): del self._send_time[seq] if seq in self._requests: del self._requests[seq] del self._request_data[seq] del self._retries[seq] del self._callbacks[seq] del self._error_callbacks[seq] def _single_retrieve(self, timeout): # Receive the next response result, seq, raw_data, offset = \ self._connection.receive_scp_response(timeout) # Only process responses which have matching requests if seq in self._requests: request_sent = self._requests[seq] # If the response can be retried, retry it, as long as the # timeout hasn't expired if (result in self._retry_codes and (time.time() - self._send_time[seq] < self._packet_timeout)): self._connection.send(self._request_data[seq]) self._n_retry_code_resent += 1 else: # No retry is possible - try constructing the result try: response = request_sent.get_scp_response() response.read_bytestring(raw_data, offset) if self._callbacks[seq] is not None: self._callbacks[seq](response) except Exception as e: self._error_callbacks[seq]( request_sent, e, sys.exc_info()[2]) # Remove the sequence from the outstanding responses self._remove_record(seq) self._in_progress -= 1 def _handle_receive_timeout(self, to_resend): self._n_timeouts += 1 # If there is a timeout, all packets remaining are resent time_now = time.time() to_remove = list() for seq, request_sent in self._requests.iteritems(): if time_now - self._send_time[seq] >= self._packet_timeout: to_resend.append((seq, request_sent)) self._in_progress -= 1 to_remove.append(seq) for seq in to_remove: del self._requests[seq] def _resend(self, seq, request_sent): if self._retries[seq] > 0: # If the request can be retried, retry it self._retries[seq] -= 1 self._in_progress += 1 self._requests[seq] = request_sent self._send_time[seq] = time.time() self._connection.send(self._request_data[seq]) self._n_resent += 1 else: # Otherwise, report it as a timeout raise SpinnmanTimeoutException( request_sent.scp_request_header.command, self._packet_timeout) def _do_retrieve(self, n_packets, timeout): """ Receives responses until there are only n_packets responses left :param n_packets: The number of packets that can remain after running """ # Keep a set of packets to resend to_resend = list() # While there are still more packets in progress than some threshold while self._in_progress > n_packets: try: # Receive the next response self._single_retrieve(timeout) except SpinnmanTimeoutException: self._handle_receive_timeout(to_resend) # Try to resend the packets for seq, request_sent in to_resend: try: if self._retries[seq] > 0: # If the request can be retried, retry it self._retries[seq] -= 1 self._in_progress += 1 self._requests[seq] = request_sent self._send_time[seq] = time.time() self._connection.send(self._request_data[seq]) self._n_resent += 1 else: # Otherwise, report it as a timeout raise SpinnmanTimeoutException( request_sent.scp_request_header.command, self._packet_timeout) except Exception as e: self._error_callbacks[seq]( request_sent, e, sys.exc_info()[2]) self._remove_record(seq)