Implement block transfer logic
This commit is contained in:
31
observer.py
Normal file
31
observer.py
Normal file
@@ -0,0 +1,31 @@
|
||||
from multiprocessing import Lock, Queue
|
||||
|
||||
class Observer:
|
||||
def __init__(self):
|
||||
self.__receivers_list = {}
|
||||
self.__lock = Lock()
|
||||
def listen(self, identifier):
|
||||
with self.__lock:
|
||||
queue = Queue()
|
||||
self.__receivers_list.setdefault(identifier, set())
|
||||
self.__receivers_list[identifier].add(queue)
|
||||
return Subscription(self, identifier, queue)
|
||||
def publish(self, identifier, message):
|
||||
if identifier in self.__receivers_list:
|
||||
for queue in self.__receivers_list[identifier]:
|
||||
queue.put(message)
|
||||
def quit(self, identifer, queue):
|
||||
with self.__lock:
|
||||
self.__receivers_list[identifer].remove(queue)
|
||||
if len(self.__receivers_list[identifer]) == 0:
|
||||
del self.__receivers_list[identifer]
|
||||
|
||||
class Subscription:
|
||||
def __init__(self, observer, identifier, queue):
|
||||
self.__observer = observer
|
||||
self.__identifier = identifier
|
||||
self.__queue = queue
|
||||
def receive(self, timeout):
|
||||
return self.__queue.get(timeout=timeout)
|
||||
def __del__(self):
|
||||
self.__observer.quit(self.__identifier, self.__queue)
|
||||
Reference in New Issue
Block a user