32 lines
1.1 KiB
Python
32 lines
1.1 KiB
Python
from multiprocessing import Lock, Queue
|
|
|
|
class Observer:
|
|
def __init__(self):
|
|
self.__receivers_list = {}
|
|
self.__lock = Lock()
|
|
def listen(self, identifier):
|
|
with self.__lock:
|
|
queue = Queue()
|
|
self.__receivers_list.setdefault(identifier, set())
|
|
self.__receivers_list[identifier].add(queue)
|
|
return Subscription(self, identifier, queue)
|
|
def publish(self, identifier, message):
|
|
if identifier in self.__receivers_list:
|
|
for queue in self.__receivers_list[identifier]:
|
|
queue.put(message)
|
|
def quit(self, identifer, queue):
|
|
with self.__lock:
|
|
self.__receivers_list[identifer].remove(queue)
|
|
if len(self.__receivers_list[identifer]) == 0:
|
|
del self.__receivers_list[identifer]
|
|
|
|
class Subscription:
|
|
def __init__(self, observer, identifier, queue):
|
|
self.__observer = observer
|
|
self.__identifier = identifier
|
|
self.__queue = queue
|
|
def receive(self, timeout):
|
|
return self.__queue.get(timeout=timeout)
|
|
def __del__(self):
|
|
self.__observer.quit(self.__identifier, self.__queue)
|