Source code for spinnman.connections.connection_listener
import logging
from threading import Thread
from multiprocessing.pool import ThreadPool
logger = logging.getLogger(__name__)
[docs]class ConnectionListener(Thread):
""" Listens to a connection and calls callbacks with new messages when \
they arrive
"""
def __init__(self, connection, n_processes=4):
"""
:param connection: An AbstractListenable connection to listen to
:param n_processes: The number of threads to use when calling\
callbacks
"""
Thread.__init__(
self,
name="Connection listener for connection {}".format(connection))
self._connection = connection
self._get_message_call = connection.get_receive_method()
self._callback_pool = ThreadPool(processes=n_processes)
self._done = False
self._callbacks = set()
self.setDaemon(True)
def _run_step(self):
if self._connection.is_ready_to_receive(timeout=1):
message = self._get_message_call()
for callback in self._callbacks:
self._callback_pool.apply_async(callback, [message])
[docs] def run(self):
while not self._done:
try:
self._run_step()
except:
if not self._done:
logger.warn("problem when dispatching message",
exc_info=True)
self._callback_pool.close()
self._callback_pool.join()
[docs] def add_callback(self, callback):
""" Add a callback to be called when a message is received
:param callback: A callable which takes a single parameter, which is\
the message received
"""
self._callbacks.add(callback)
[docs] def close(self):
""" Closes the listener. Note that this does not close the provider\
of the messages; this instead marks the listener as closed. The\
listener will not truly stop until the get message call returns.
"""
self._done = True
self.join()