Source code for spinnman.connections.connection_listener

# Copyright (c) 2017-2019 The University of Manchester
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

import logging
from threading import Thread
from concurrent.futures import ThreadPoolExecutor

logger = logging.getLogger(__name__)
_POOL_SIZE = 4
_TIMEOUT = 1


[docs]class ConnectionListener(Thread): """ Thread that listens to a connection and calls callbacks with new\ messages when they arrive. """ __slots__ = [ "_callback_pool", "_callbacks", "_connection", "_done", "_timeout"] def __init__(self, connection, n_processes=_POOL_SIZE, timeout=_TIMEOUT): """ :param connection: An AbstractListenable connection to listen to :param n_processes: \ The number of threads to use when calling callbacks :param timeout: How long to wait for messages before checking to see\ if the connection is to be terminated. """ super(ConnectionListener, self).__init__( name="Connection listener for connection {}".format(connection)) self.daemon = True self._connection = connection self._timeout = timeout self._callback_pool = ThreadPoolExecutor(max_workers=n_processes) self._done = False self._callbacks = set() def _run_step(self, handler): if self._connection.is_ready_to_receive(timeout=self._timeout): message = handler() for callback in self._callbacks: self._callback_pool.submit(callback, message)
[docs] def run(self): try: handler = self._connection.get_receive_method() while not self._done: try: self._run_step(handler) except Exception: if not self._done: logger.warning("problem when dispatching message", exc_info=True) finally: self._callback_pool.shutdown() self._callback_pool = None
[docs] def add_callback(self, callback): """ Add a callback to be called when a message is received :param callback: A callable which takes a single parameter, which is\ the message received :type callback: callable (connection message type -> None) """ self._callbacks.add(callback)
[docs] def close(self): """ Closes the listener. Note that this does not close the provider\ of the messages; this instead marks the listener as closed. The\ listener will not truly stop until the get message call returns. """ self._done = True self.join()