from multiprocessing import Lock, Queue class Observer: def __init__(self): self.__receivers_list = {} self.__lock = Lock() def listen(self, identifier): with self.__lock: queue = Queue() self.__receivers_list.setdefault(identifier, set()) self.__receivers_list[identifier].add(queue) return Subscription(self, identifier, queue) def publish(self, identifier, message): if identifier in self.__receivers_list: for queue in self.__receivers_list[identifier]: queue.put(message) def quit(self, identifer, queue): with self.__lock: self.__receivers_list[identifer].remove(queue) if len(self.__receivers_list[identifer]) == 0: del self.__receivers_list[identifer] class Subscription: def __init__(self, observer, identifier, queue): self.__observer = observer self.__identifier = identifier self.__queue = queue def receive(self, timeout): return self.__queue.get(timeout=timeout) def __del__(self): self.__observer.quit(self.__identifier, self.__queue)