Source code for spinnman.processes.most_direct_connection_selector
# Copyright (c) 2015 The University of Manchester
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Dict, List
from spinn_utilities.overrides import overrides
from spinn_utilities.typing.coords import XY
from spinnman.data import SpiNNManDataView
from spinnman.connections.udp_packet_connections import SCAMPConnection
from spinnman.messages.scp.abstract_messages import AbstractSCPRequest
from .abstract_multi_connection_process_connection_selector import (
ConnectionSelector)
class MostDirectConnectionSelector(ConnectionSelector):
"""
A selector that goes for the most direct connection for the message.
"""
__slots__ = (
"_connections",
"_lead_connection")
def __init__(self, connections: List[SCAMPConnection]):
"""
:param list(SCAMPConnection) connections:
The connections to be used
"""
self._connections: Dict[XY, SCAMPConnection] = dict()
lead_connection = None
for conn in connections:
if conn.chip_x == 0 and conn.chip_y == 0:
lead_connection = conn
self._connections[conn.chip_x, conn.chip_y] = conn
if lead_connection is None:
lead_connection = next(iter(connections))
self._lead_connection = lead_connection
[docs]
@overrides(ConnectionSelector.get_next_connection)
def get_next_connection(
self, message: AbstractSCPRequest) -> SCAMPConnection:
key = (message.sdp_header.destination_chip_x,
message.sdp_header.destination_chip_y)
if key in self._connections:
return self._connections[key]
if not SpiNNManDataView.has_machine() or len(self._connections) == 1:
return self._lead_connection
x, y = key
key = SpiNNManDataView.get_nearest_ethernet(x, y)
return self._connections.get(key, self._lead_connection)