Source code for spinnman.connections.listeners.queuers.abstract_port_queuer
import threading
import collections
import logging
from abc import ABCMeta
from abc import abstractmethod
from six import add_metaclass
import socket
logger = logging.getLogger(__name__)
@add_metaclass(ABCMeta)
[docs]class AbstractPortQueuer(threading.Thread):
""" A Queue for packets received
"""
def __init__(self):
threading.Thread.__init__(self)
self._queue = collections.deque()
self._queue_condition = threading.Condition()
self._done = False
self.setDaemon(True)
[docs] def stop(self):
""" Stop the thread
"""
logger.debug("[Queuer] Stopping")
self._queue_condition.acquire()
self._done = True
self._queue_condition.notify_all()
self._queue_condition.release()
[docs] def run(self):
logger.debug("[Queuer] Starting")
while not self._done:
try:
packet = self._read_packet()
self._queue_condition.acquire()
self._queue.append(packet)
self._queue_condition.notify_all()
self._queue_condition.release()
except socket.timeout:
pass
except Exception as e:
if not self._done:
raise e
self._queue_condition.acquire()
self._queue_condition.notify_all()
self._queue_condition.release()
@abstractmethod
def _read_packet(self):
""" Read a packet to be added to the queue
:return: The read packet
:raise SpinnmanIOException: If the packet could not be read
:raise socket.timeout: If there is a timeout on reading
"""
[docs] def get_packet(self):
""" Get the next packet from the queue
:return: The next packet, or None if the queue has been stopped
"""
self._queue_condition.acquire()
while len(self._queue) == 0 and not self._done:
self._queue_condition.wait()
packet = None
if not self._done:
packet = self._queue.popleft()
self._queue_condition.release()
return packet