Implement the partner introduction logic
This commit is contained in:
133
node.py
133
node.py
@@ -1,6 +1,6 @@
|
|||||||
#! /usr/bin/env python3
|
#! /usr/bin/env python3
|
||||||
|
|
||||||
import hashlib, socket, sys, threading, time
|
import hashlib, random, socket, sys, threading, time
|
||||||
|
|
||||||
DEFAULT_PORT = 62039
|
DEFAULT_PORT = 62039
|
||||||
|
|
||||||
@@ -9,17 +9,47 @@ class Peer:
|
|||||||
self.ipv6 = ipv6
|
self.ipv6 = ipv6
|
||||||
self.port = port
|
self.port = port
|
||||||
self.lifetime_counter = lifetime_counter
|
self.lifetime_counter = lifetime_counter
|
||||||
|
self.partner = None
|
||||||
|
def get_address(self):
|
||||||
|
return self.ipv6, self.port
|
||||||
|
def __str__(self):
|
||||||
|
return describe(self.ipv6, self.port)
|
||||||
|
|
||||||
class Node:
|
class Node:
|
||||||
def __init__(self, node_socket, peers):
|
def __init__(self, node_socket, peers):
|
||||||
|
self.lock = threading.Lock()
|
||||||
self.node_socket = node_socket
|
self.node_socket = node_socket
|
||||||
self.peers = peers
|
self.peers = peers
|
||||||
|
self.incoming_heartbeats = []
|
||||||
|
self.partners = []
|
||||||
|
def add_heartbeat(self, address):
|
||||||
|
with self.lock:
|
||||||
|
self.incoming_heartbeats.append(address)
|
||||||
|
def add_partner(self, address):
|
||||||
|
with self.lock:
|
||||||
|
self.partners.append(address)
|
||||||
|
def get_events(self):
|
||||||
|
with self.lock:
|
||||||
|
heartbeats = self.incoming_heartbeats
|
||||||
|
partners = self.partners
|
||||||
|
self.incoming_heartbeats = []
|
||||||
|
self.partners = []
|
||||||
|
return heartbeats, partners
|
||||||
|
|
||||||
|
def describe(ipv6, port):
|
||||||
|
if port == DEFAULT_PORT:
|
||||||
|
return ipv6
|
||||||
|
return f"[{ipv6}]:{port}"
|
||||||
|
|
||||||
|
def log(msg):
|
||||||
|
time_info = time.strftime("%d.%m.%Y %H:%M:%S")
|
||||||
|
print(f"[{time_info}] {msg}")
|
||||||
|
|
||||||
def parse_address(addr_str):
|
def parse_address(addr_str):
|
||||||
if addr_str.startswith('['):
|
if addr_str.startswith("["):
|
||||||
closing = addr_str.find(']:')
|
closing = addr_str.find("]:")
|
||||||
if closing == -1:
|
if closing == -1:
|
||||||
raise Exception(f'Not a valid address: {addr_str}')
|
raise Exception(f"Not a valid address: {addr_str}")
|
||||||
ipv6 = addr_str[1:closing]
|
ipv6 = addr_str[1:closing]
|
||||||
port = int(addr_str[closing+2:])
|
port = int(addr_str[closing+2:])
|
||||||
else:
|
else:
|
||||||
@@ -45,23 +75,108 @@ def send_heartbeat(node, peer):
|
|||||||
msg_type = b"\0"
|
msg_type = b"\0"
|
||||||
difficulty_sum = 32 * b"\0"
|
difficulty_sum = 32 * b"\0"
|
||||||
hash_value = empty_transaction_list_hash()
|
hash_value = empty_transaction_list_hash()
|
||||||
partner_ipv6 = 16 * b"\0"
|
|
||||||
partner_port = 2 * b"\0"
|
if peer.partner is None:
|
||||||
|
partner_ipv6 = 16 * b"\0"
|
||||||
|
partner_port = 2 * b"\0"
|
||||||
|
else:
|
||||||
|
partner_ipv6 = socket.inet_pton(socket.AF_INET6, peer.partner.ipv6)
|
||||||
|
partner_port = peer.partner.port.to_bytes(2, "big")
|
||||||
|
|
||||||
heartbeat_msg = protocol_version + capable_version + msg_type + \
|
heartbeat_msg = protocol_version + capable_version + msg_type + \
|
||||||
difficulty_sum + hash_value + partner_ipv6 + partner_port
|
difficulty_sum + hash_value + partner_ipv6 + partner_port
|
||||||
node.node_socket.sendto(heartbeat_msg, (peer.ipv6, peer.port))
|
node.node_socket.sendto(heartbeat_msg, (peer.ipv6, peer.port))
|
||||||
|
|
||||||
|
def define_partnership(peers):
|
||||||
|
peers_to_pair = [peer for peer in peers if peer.lifetime_counter >= 8]
|
||||||
|
random.shuffle(peers_to_pair)
|
||||||
|
pairing_count = len(peers_to_pair) // 2
|
||||||
|
for pair_idx in range(pairing_count):
|
||||||
|
peers_to_pair[2*pair_idx].partner = peers_to_pair[2*pair_idx+1]
|
||||||
|
peers_to_pair[2*pair_idx+1].partner = peers_to_pair[2*pair_idx]
|
||||||
|
# in case of an odd count, the last one will remain without partner
|
||||||
|
if pairing_count % 2 == 1:
|
||||||
|
peers_to_pair[-1].partner = None
|
||||||
|
|
||||||
def heartbeat(node):
|
def heartbeat(node):
|
||||||
while True:
|
while True:
|
||||||
|
heartbeats, partners = node.get_events()
|
||||||
|
heartbeats = set(heartbeats)
|
||||||
|
partners = set(partners)
|
||||||
|
peerlist_update = False
|
||||||
|
for peer in node.peers:
|
||||||
|
address = peer.get_address()
|
||||||
|
if address in heartbeats:
|
||||||
|
peer.lifetime_counter = 10
|
||||||
|
heartbeats.remove(address)
|
||||||
|
else:
|
||||||
|
peer.lifetime_counter -= 1
|
||||||
|
if peer.lifetime_counter < 0:
|
||||||
|
log(f"Removing {peer} from the list (no contact)")
|
||||||
|
peerlist_update = True
|
||||||
|
if address in partners:
|
||||||
|
partners.remove(address)
|
||||||
|
|
||||||
|
# cleaning up inactives
|
||||||
|
node.peers = [peer for peer in node.peers if peer.lifetime_counter >= 0]
|
||||||
|
|
||||||
|
# adding new peers that contacted me
|
||||||
|
for new_peer in (heartbeats - partners):
|
||||||
|
peer = Peer(new_peer[0], new_peer[1], 10)
|
||||||
|
log(f"Adding peer {peer} to the list (It connected on its own)")
|
||||||
|
node.peers.append(peer)
|
||||||
|
peerlist_update = True
|
||||||
|
|
||||||
|
# adding partners
|
||||||
|
for new_peer in partners:
|
||||||
|
peer = Peer(new_peer[0], new_peer[1], 3)
|
||||||
|
log(f"Adding peer {peer} to the list (Got it as \"partner\" from another peer)")
|
||||||
|
node.peers.append(peer)
|
||||||
|
peerlist_update = True
|
||||||
|
|
||||||
peer_count = len(node.peers)
|
peer_count = len(node.peers)
|
||||||
|
if peerlist_update:
|
||||||
|
if peer_count == 1:
|
||||||
|
log(f"There is now 1 peer in my list.")
|
||||||
|
else:
|
||||||
|
log(f"There are now {peer_count} peers in my list.")
|
||||||
start_time = time.time()
|
start_time = time.time()
|
||||||
|
define_partnership(node.peers)
|
||||||
for i, peer in enumerate(node.peers):
|
for i, peer in enumerate(node.peers):
|
||||||
wait_until(start_time + 60 * (i+1) / peer_count)
|
wait_until(start_time + 60 * (i+1) / peer_count)
|
||||||
send_heartbeat(node, peer)
|
send_heartbeat(node, peer)
|
||||||
|
|
||||||
def receiver(node):
|
def receiver(node):
|
||||||
pass
|
while True:
|
||||||
|
msg, addr = node.node_socket.recvfrom(4096)
|
||||||
|
sender = describe(addr[0], addr[1])
|
||||||
|
msg_len = len(msg)
|
||||||
|
if msg_len < 4:
|
||||||
|
log("Got a udp message from {sender} that was too short.")
|
||||||
|
continue
|
||||||
|
version = int.from_bytes(msg[0:2], "big")
|
||||||
|
if version != 0:
|
||||||
|
log(f"Got a udp message of version {version} from {sender}, but only version 0 is supported.")
|
||||||
|
continue
|
||||||
|
if msg_len < 5:
|
||||||
|
log(f"Got a udp message from {sender} that was too short. (missing type field)")
|
||||||
|
continue
|
||||||
|
msg_type = msg[4]
|
||||||
|
if msg_type == 0:
|
||||||
|
# heartbeat
|
||||||
|
if msg_len != 87:
|
||||||
|
log(f"Got a heartbeat message of wrong length ({msg_len} bytes from {sender}, but expected 87 bytes)")
|
||||||
|
continue
|
||||||
|
# register heartbeat of the sender
|
||||||
|
node.add_heartbeat(addr[0:2])
|
||||||
|
partner_info = msg[69:87]
|
||||||
|
if partner_info != 18 * b"\0":
|
||||||
|
# register the partner that was sent along
|
||||||
|
partner_ip = socket.inet_ntop(socket.AF_INET6, partner_info[0:16])
|
||||||
|
partner_port = int.from_bytes(partner_info[16:18], "big")
|
||||||
|
node.add_partner((partner_ip, partner_port))
|
||||||
|
else:
|
||||||
|
log(f"Got a udp message of unknown type from {sender}. (type {msg_type})")
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
address_arguments = sys.argv[1:]
|
address_arguments = sys.argv[1:]
|
||||||
@@ -74,9 +189,11 @@ def main():
|
|||||||
if e.errno == 98:
|
if e.errno == 98:
|
||||||
node_socket.bind(("::", 0))
|
node_socket.bind(("::", 0))
|
||||||
port = node_socket.getsockname()[1]
|
port = node_socket.getsockname()[1]
|
||||||
print(f"Default port {DEFAULT_PORT} is in use, listening on port {port} instead.")
|
log(f"Default port {DEFAULT_PORT} is in use, listening on port {port} instead.")
|
||||||
else:
|
else:
|
||||||
raise e
|
raise e
|
||||||
|
|
||||||
|
log("Node is ready")
|
||||||
|
|
||||||
node = Node(node_socket, peers)
|
node = Node(node_socket, peers)
|
||||||
heartbeat_thread = threading.Thread(target = heartbeat, args = (node,))
|
heartbeat_thread = threading.Thread(target = heartbeat, args = (node,))
|
||||||
|
|||||||
Reference in New Issue
Block a user