Files
carrotcoin/observer.py

32 lines
1.1 KiB
Python

from multiprocessing import Lock, Queue
class Observer:
def __init__(self):
self.__receivers_list = {}
self.__lock = Lock()
def listen(self, identifier):
with self.__lock:
queue = Queue()
self.__receivers_list.setdefault(identifier, set())
self.__receivers_list[identifier].add(queue)
return Subscription(self, identifier, queue)
def publish(self, identifier, message):
if identifier in self.__receivers_list:
for queue in self.__receivers_list[identifier]:
queue.put(message)
def quit(self, identifer, queue):
with self.__lock:
self.__receivers_list[identifer].remove(queue)
if len(self.__receivers_list[identifer]) == 0:
del self.__receivers_list[identifer]
class Subscription:
def __init__(self, observer, identifier, queue):
self.__observer = observer
self.__identifier = identifier
self.__queue = queue
def receive(self, timeout):
return self.__queue.get(timeout=timeout)
def __del__(self):
self.__observer.quit(self.__identifier, self.__queue)