diff --git a/node.py b/node.py index fa410ef..709e649 100755 --- a/node.py +++ b/node.py @@ -1,6 +1,6 @@ #! /usr/bin/env python3 -import hashlib, socket, sys, threading, time +import hashlib, random, socket, sys, threading, time DEFAULT_PORT = 62039 @@ -9,17 +9,47 @@ class Peer: self.ipv6 = ipv6 self.port = port self.lifetime_counter = lifetime_counter + self.partner = None + def get_address(self): + return self.ipv6, self.port + def __str__(self): + return describe(self.ipv6, self.port) class Node: def __init__(self, node_socket, peers): + self.lock = threading.Lock() self.node_socket = node_socket self.peers = peers + self.incoming_heartbeats = [] + self.partners = [] + def add_heartbeat(self, address): + with self.lock: + self.incoming_heartbeats.append(address) + def add_partner(self, address): + with self.lock: + self.partners.append(address) + def get_events(self): + with self.lock: + heartbeats = self.incoming_heartbeats + partners = self.partners + self.incoming_heartbeats = [] + self.partners = [] + return heartbeats, partners + +def describe(ipv6, port): + if port == DEFAULT_PORT: + return ipv6 + return f"[{ipv6}]:{port}" + +def log(msg): + time_info = time.strftime("%d.%m.%Y %H:%M:%S") + print(f"[{time_info}] {msg}") def parse_address(addr_str): - if addr_str.startswith('['): - closing = addr_str.find(']:') + if addr_str.startswith("["): + closing = addr_str.find("]:") if closing == -1: - raise Exception(f'Not a valid address: {addr_str}') + raise Exception(f"Not a valid address: {addr_str}") ipv6 = addr_str[1:closing] port = int(addr_str[closing+2:]) else: @@ -45,23 +75,108 @@ def send_heartbeat(node, peer): msg_type = b"\0" difficulty_sum = 32 * b"\0" hash_value = empty_transaction_list_hash() - partner_ipv6 = 16 * b"\0" - partner_port = 2 * b"\0" + + if peer.partner is None: + partner_ipv6 = 16 * b"\0" + partner_port = 2 * b"\0" + else: + partner_ipv6 = socket.inet_pton(socket.AF_INET6, peer.partner.ipv6) + partner_port = peer.partner.port.to_bytes(2, "big") heartbeat_msg = protocol_version + capable_version + msg_type + \ difficulty_sum + hash_value + partner_ipv6 + partner_port node.node_socket.sendto(heartbeat_msg, (peer.ipv6, peer.port)) +def define_partnership(peers): + peers_to_pair = [peer for peer in peers if peer.lifetime_counter >= 8] + random.shuffle(peers_to_pair) + pairing_count = len(peers_to_pair) // 2 + for pair_idx in range(pairing_count): + peers_to_pair[2*pair_idx].partner = peers_to_pair[2*pair_idx+1] + peers_to_pair[2*pair_idx+1].partner = peers_to_pair[2*pair_idx] + # in case of an odd count, the last one will remain without partner + if pairing_count % 2 == 1: + peers_to_pair[-1].partner = None + def heartbeat(node): while True: + heartbeats, partners = node.get_events() + heartbeats = set(heartbeats) + partners = set(partners) + peerlist_update = False + for peer in node.peers: + address = peer.get_address() + if address in heartbeats: + peer.lifetime_counter = 10 + heartbeats.remove(address) + else: + peer.lifetime_counter -= 1 + if peer.lifetime_counter < 0: + log(f"Removing {peer} from the list (no contact)") + peerlist_update = True + if address in partners: + partners.remove(address) + + # cleaning up inactives + node.peers = [peer for peer in node.peers if peer.lifetime_counter >= 0] + + # adding new peers that contacted me + for new_peer in (heartbeats - partners): + peer = Peer(new_peer[0], new_peer[1], 10) + log(f"Adding peer {peer} to the list (It connected on its own)") + node.peers.append(peer) + peerlist_update = True + + # adding partners + for new_peer in partners: + peer = Peer(new_peer[0], new_peer[1], 3) + log(f"Adding peer {peer} to the list (Got it as \"partner\" from another peer)") + node.peers.append(peer) + peerlist_update = True + peer_count = len(node.peers) + if peerlist_update: + if peer_count == 1: + log(f"There is now 1 peer in my list.") + else: + log(f"There are now {peer_count} peers in my list.") start_time = time.time() + define_partnership(node.peers) for i, peer in enumerate(node.peers): wait_until(start_time + 60 * (i+1) / peer_count) send_heartbeat(node, peer) def receiver(node): - pass + while True: + msg, addr = node.node_socket.recvfrom(4096) + sender = describe(addr[0], addr[1]) + msg_len = len(msg) + if msg_len < 4: + log("Got a udp message from {sender} that was too short.") + continue + version = int.from_bytes(msg[0:2], "big") + if version != 0: + log(f"Got a udp message of version {version} from {sender}, but only version 0 is supported.") + continue + if msg_len < 5: + log(f"Got a udp message from {sender} that was too short. (missing type field)") + continue + msg_type = msg[4] + if msg_type == 0: + # heartbeat + if msg_len != 87: + log(f"Got a heartbeat message of wrong length ({msg_len} bytes from {sender}, but expected 87 bytes)") + continue + # register heartbeat of the sender + node.add_heartbeat(addr[0:2]) + partner_info = msg[69:87] + if partner_info != 18 * b"\0": + # register the partner that was sent along + partner_ip = socket.inet_ntop(socket.AF_INET6, partner_info[0:16]) + partner_port = int.from_bytes(partner_info[16:18], "big") + node.add_partner((partner_ip, partner_port)) + else: + log(f"Got a udp message of unknown type from {sender}. (type {msg_type})") def main(): address_arguments = sys.argv[1:] @@ -74,9 +189,11 @@ def main(): if e.errno == 98: node_socket.bind(("::", 0)) port = node_socket.getsockname()[1] - print(f"Default port {DEFAULT_PORT} is in use, listening on port {port} instead.") + log(f"Default port {DEFAULT_PORT} is in use, listening on port {port} instead.") else: raise e + + log("Node is ready") node = Node(node_socket, peers) heartbeat_thread = threading.Thread(target = heartbeat, args = (node,))