#! /usr/bin/env python3 import blockchain, hashlib, observer, random, socket, sys, threading, time from _queue import Empty DEFAULT_PORT = 62039 class Peer: def __init__(self, ipv6, port, lifetime_counter): self.ipv6 = ipv6 self.port = port self.lifetime_counter = lifetime_counter self.partner = None def get_address(self): return self.ipv6, self.port def __str__(self): return describe(self.ipv6, self.port) class Node: def __init__(self, node_socket, peers): self.lock = threading.Lock() self.node_socket = node_socket self.peers = peers self.incoming_heartbeats = [] self.partners = [] def add_heartbeat(self, address): with self.lock: self.incoming_heartbeats.append(address) def add_partner(self, address): with self.lock: self.partners.append(address) def get_events(self): with self.lock: heartbeats = self.incoming_heartbeats partners = self.partners self.incoming_heartbeats = [] self.partners = [] return heartbeats, partners def is_loopback(ipv6): return ipv6 == 15 * b"\0" + b"\x01" def is_unspecified(ipv6): return ipv6 == 16 * b"\0" def is_mapped_ipv4(ipv6): return ipv6[0:12] == 10 * b"\0" + 2 * b"\xff" def is_local_unicast(ipv6): return (ipv6[0] & 0xfe) == 0xfc def is_valid_address(ipv6): ipv6 = socket.inet_pton(socket.AF_INET6, ipv6) return not is_loopback(ipv6) and \ not is_unspecified(ipv6) and \ not is_mapped_ipv4(ipv6) and \ not is_local_unicast(ipv6) def describe(ipv6, port): if port == DEFAULT_PORT: return ipv6 return f"[{ipv6}]:{port}" def log(msg): time_info = time.strftime("%d.%m.%Y %H:%M:%S") print(f"[{time_info}] {msg}") def parse_address(addr_str): if addr_str.startswith("["): closing = addr_str.find("]:") if closing == -1: raise Exception(f"Not a valid address: {addr_str}") ipv6 = addr_str[1:closing] port = int(addr_str[closing+2:]) else: ipv6 = addr_str port = DEFAULT_PORT return Peer(ipv6, port, 10) def wait_until(target_time): duration = target_time - time.time() if duration > 0: time.sleep(duration) def send_heartbeat(node, peer, b): protocol_version = 2 * b"\0" capable_version = 2 * b"\0" msg_type = b"\0" difficulty_sum = b.get_second_last_difficulty_sum().to_bytes(32, "big") hash_value = b.open_transactions.get_hash(1023) if peer.partner is None: partner_ipv6 = 16 * b"\0" partner_port = 2 * b"\0" else: partner_ipv6 = socket.inet_pton(socket.AF_INET6, peer.partner.ipv6) partner_port = peer.partner.port.to_bytes(2, "big") heartbeat_msg = protocol_version + capable_version + msg_type + \ difficulty_sum + hash_value + partner_ipv6 + partner_port node.node_socket.sendto(heartbeat_msg, (peer.ipv6, peer.port)) def define_partnership(peers): peers_to_pair = [peer for peer in peers if peer.lifetime_counter >= 8] random.shuffle(peers_to_pair) pairing_count = len(peers_to_pair) // 2 for pair_idx in range(pairing_count): peers_to_pair[2*pair_idx].partner = peers_to_pair[2*pair_idx+1] peers_to_pair[2*pair_idx+1].partner = peers_to_pair[2*pair_idx] # in case of an odd count, the last one will remain without partner if pairing_count % 2 == 1: peers_to_pair[-1].partner = None def heartbeat(node, b): while True: heartbeats, partners = node.get_events() heartbeats = set(heartbeats) partners = set(partners) peerlist_update = False for peer in node.peers: address = peer.get_address() if address in heartbeats: peer.lifetime_counter = 10 heartbeats.remove(address) else: peer.lifetime_counter -= 1 if peer.lifetime_counter < 0: log(f"Removing {peer} from the list (no contact)") peerlist_update = True if address in partners: partners.remove(address) # cleaning up inactives node.peers = [peer for peer in node.peers if peer.lifetime_counter >= 0] # adding new peers that contacted me for new_peer in (heartbeats - partners): if not is_valid_address(new_peer[0]): log(f"Ignoring this address: {new_peer[0]}") continue peer = Peer(new_peer[0], new_peer[1], 10) log(f"Adding peer {peer} to the list (It connected on its own)") node.peers.append(peer) peerlist_update = True # adding partners for new_peer in partners: if not is_valid_address(new_peer[0]): log(f"Ignoring this address: {new_peer[0]}") continue peer = Peer(new_peer[0], new_peer[1], 3) log(f"Adding peer {peer} to the list (Got it as \"partner\" from another peer)") node.peers.append(peer) peerlist_update = True peer_count = len(node.peers) if peerlist_update: if peer_count == 1: log(f"There is now 1 peer in my list.") else: log(f"There are now {peer_count} peers in my list.") start_time = time.time() define_partnership(node.peers) for i, peer in enumerate(node.peers): wait_until(start_time + 60 * (i+1) / peer_count) send_heartbeat(node, peer, b) if len(node.peers) == 0: time.sleep(60) class NoReponseException(Exception): pass def request_retry(node, addr, request, subscription, condition): for _ in range(10): node.node_socket.sendto(request, addr) try: while True: response = subscription.receive(1) if condition(response): break return response except Empty: pass raise NoReponseException() def transfer_block(addr, node, receive_observer, b): try: block_list = [] request_block_hash = 32 * b"\0" subscription = receive_observer.listen((addr[0:2], "block transfer")) while True: if request_block_hash != 32 * b"\0": existing_block = b.get_block(request_block_hash) if existing_block is not None: if existing_block.valid: break request_block_hash = existing_block.previous_hash if request_block_hash == 32*b"\0": break continue request = b"\0\0\0\0\x01" + request_block_hash def condition(response_hash): if request_block_hash == 32*b"\0": return True return response_hash == request_block_hash block_hash = request_retry(node, addr, request, subscription, condition) block_list.append(block_hash) request_block_hash = b.get_block(block_hash).previous_hash if request_block_hash == 32*b"\0": break for block_hash in reversed(block_list): if not b.get_block(block_hash).validate(b): return if b.set_latest_block(block_hash): log("Got a new block") except NoReponseException: pass def compare_open_transactions(addr, node, receive_observer, b): try: cursor = 511 offset = 256 subscription = receive_observer.listen((addr[0:2], "transaction hash")) while True: request = b"\0\0\0\0\x03" + cursor.to_bytes(2, "big") def list_hash_condition(response): return response["position"] == cursor list_hash = request_retry(node, addr, request, subscription, list_hash_condition) if list_hash["hash"] == b.open_transactions.get_hash(cursor): cursor += max(offset, 1) else: cursor -= offset if offset == 0: break offset //= 2 subscription = receive_observer.listen((addr[0:2], "transaction")) request = b"\0\0\0\0\x05" + cursor.to_bytes(2, "big") def transaction_condition(response): return response["position"] == cursor remote_transaction = request_retry(node, addr, request, subscription, transaction_condition) parsed_transaction = blockchain.Transaction.from_bytes(remote_transaction["transaction"]) if not parsed_transaction.is_valid(): return b.open_transactions.add(parsed_transaction) except NoReponseException: pass def receiver(node, b): receive_observer = observer.Observer() while True: msg, addr = node.node_socket.recvfrom(4096) sender = describe(addr[0], addr[1]) msg_len = len(msg) if msg_len < 4: log("Got a udp message from {sender} that was too short.") continue version = int.from_bytes(msg[0:2], "big") if version != 0: log(f"Got a udp message of version {version} from {sender}, but only version 0 is supported.") continue if msg_len < 5: log(f"Got a udp message from {sender} that was too short. (missing type field)") continue msg_type = msg[4] if msg_type == 0: # heartbeat if msg_len != 87: log(f"Got a heartbeat message of wrong length ({msg_len} bytes from {sender}, but expected 87 bytes)") continue # register heartbeat of the sender node.add_heartbeat(addr[0:2]) partner_info = msg[69:87] if partner_info != 18 * b"\0": # register the partner that was sent along partner_ip = socket.inet_ntop(socket.AF_INET6, partner_info[0:16]) partner_port = int.from_bytes(partner_info[16:18], "big") node.add_partner((partner_ip, partner_port)) contained_difficulty_sum = int.from_bytes(msg[5:37], "big") contained_transaction_hash = msg[37:69] my_difficulty_sum = b.get_second_last_difficulty_sum() if contained_difficulty_sum > my_difficulty_sum: log("beginning a block transfer ...") threading.Thread(target = transfer_block, args=(addr, node, receive_observer, b)).start() elif contained_transaction_hash != b.open_transactions.get_hash(1023): log("comparing open transactions ...") threading.Thread(target = compare_open_transactions, args=(addr, node, receive_observer, b)).start() elif msg_type == 1: # block request if msg_len != 37: log(f"Got a block request of wrong length ({msg_len} bytes from {sender}, but expected 37 bytes)") block_hash = msg[5:37] if block_hash == 32 * b"\0": block_to_send = b.get_latest_block() else: block_to_send = b.get_block(block_hash) if block_to_send is None: continue block_raw = block_to_send.get_block_raw() response_msg = b"\0\0\0\0\x02" + block_raw node.node_socket.sendto(response_msg, addr) elif msg_type == 2: # block transfer if msg_len != 297: log(f"Got a block transfer of wrong length ({msg_len} bytes from {sender}, but expected 297 bytes)") continue block_raw = msg[5:297] new_block = b.add_block(block_raw) block_hash = new_block.own_hash if new_block.validate(b) and b.set_latest_block(block_hash): log("Got a new block") identifier = (addr[0:2], "block transfer") receive_observer.publish(identifier, block_hash) elif msg_type == 3: # open transaction list hash request if msg_len != 7: log(f"Got an open transaction list hash request of wrong length ({msg_len} bytes from {sender}, but expected 7 bytes)") continue list_position = int.from_bytes(msg[5:7], "big") if list_position >= 1024: log(f"Got an open transaction list hash request with invalid position (position {list_position} from {sender}, but must be below 1024)") continue list_hash = b.open_transactions.get_hash(list_position) response = b"\0\0\0\0\x04" + list_position.to_bytes(2, "big") + list_hash node.node_socket.sendto(response, addr) elif msg_type == 4: # open transaction list hash response if msg_len != 39: log(f"Got an open transaction list hash response of wrong length ({msg_len} bytes from {sender}, but expected 39 bytes)") continue event_obj = { "position": int.from_bytes(msg[5:7], "big"), "hash": msg[7:39], } identifier = (addr[0:2], "transaction hash") receive_observer.publish(identifier, event_obj) elif msg_type == 5: # open transaction request if msg_len != 7: log(f"Got an open transaction request of wrong length ({msg_len} bytes from {sender}, but expected 7 bytes)") continue list_position = int.from_bytes(msg[5:7], "big") if list_position >= 1024: log(f"Got an open transaction request with invalid position (position {list_position} from {sender}, but must be below 1024)") continue transaction = b.open_transactions.get_transaction(list_position) if transaction is None: transaction_raw = 148 * b"\0" else: transaction_raw = transaction.get_transaction_raw() response = b"\0\0\0\0\x06" + list_position.to_bytes(2, "big") + transaction_raw node.node_socket.sendto(response, addr) elif msg_type == 6: # open transaction response if msg_len != 155: log(f"Got an open transaction list hash response of wrong length ({msg_len} bytes from {sender}, but expected 155 bytes)") continue event_obj = { "position": int.from_bytes(msg[5:7], "big"), "transaction": msg[7:155], } identifier = (addr[0:2], "transaction") receive_observer.publish(identifier, event_obj) elif msg_type == 7: # mining task request if msg_len != 257: log(f"Got a mining task request of wrong length ({msg_len} bytes from {sender}, but expected 257 bytes)") continue transaction = b.open_transactions.get_transaction(0) if transaction is not None: transaction_raw = transaction.get_transaction_raw() else: transaction_raw = 148 * b"\0" t = int(time.time()) timestamp_raw = t.to_bytes(8, "big") latest_block = b.get_latest_block() if latest_block is not None: B_1_difficulty_sum, _ = latest_block.get_difficulty_info(0, b) B_10_difficulty_sum, B_10_timestamp = latest_block.get_difficulty_info(9, b) D = B_1_difficulty_sum - B_10_difficulty_sum T = t - B_10_timestamp calculated_difficulty = D * 3000 // 9 // T block_difficulty = max(calculated_difficulty, 2**28) difficulty_sum = B_1_difficulty_sum + block_difficulty previous_hash = latest_block.own_hash else: block_difficulty = 2**28 difficulty_sum = 2**29 previous_hash = 32 * b"\0" threshold = (2**256 - 1) // block_difficulty response = b"\0\0\0\0\x08" + \ transaction_raw + \ previous_hash + \ timestamp_raw + \ difficulty_sum.to_bytes(32, "big") + \ threshold.to_bytes(32, "big") node.node_socket.sendto(response, addr) elif msg_type == 9: # payment request if msg_len != 153: log(f"Got a payment of wrong length ({msg_len} bytes from {sender}, but expected 153 bytes)") continue parsed_transaction = blockchain.Transaction.from_bytes(msg[5:153]) if not parsed_transaction.is_valid(): continue b.open_transactions.add(parsed_transaction) node.node_socket.sendto(b"\0\0\0\0\x0a", addr) else: log(f"Got a udp message of unknown type from {sender}. (type {msg_type})") def main(): address_arguments = sys.argv[1:] peers = [parse_address(argument) for argument in address_arguments] node_socket = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM) try: node_socket.bind(("::", DEFAULT_PORT)) except OSError as e: if e.errno == 98: node_socket.bind(("::", 0)) port = node_socket.getsockname()[1] log(f"Default port {DEFAULT_PORT} is in use, listening on port {port} instead.") else: raise e log("Node is ready") node = Node(node_socket, peers) b = blockchain.Blockchain() heartbeat_thread = threading.Thread(target = heartbeat, args = (node, b)) receiving_thread = threading.Thread(target = receiver, args = (node, b)) heartbeat_thread.start() receiving_thread.start() heartbeat_thread.join() receiving_thread.join() if __name__ == '__main__': main()