Files
carrotcoin/node.py

235 lines
7.9 KiB
Python
Executable File

#! /usr/bin/env python3
import hashlib, random, socket, sys, threading, time
DEFAULT_PORT = 62039
class Peer:
def __init__(self, ipv6, port, lifetime_counter):
self.ipv6 = ipv6
self.port = port
self.lifetime_counter = lifetime_counter
self.partner = None
def get_address(self):
return self.ipv6, self.port
def __str__(self):
return describe(self.ipv6, self.port)
class Node:
def __init__(self, node_socket, peers):
self.lock = threading.Lock()
self.node_socket = node_socket
self.peers = peers
self.incoming_heartbeats = []
self.partners = []
def add_heartbeat(self, address):
with self.lock:
self.incoming_heartbeats.append(address)
def add_partner(self, address):
with self.lock:
self.partners.append(address)
def get_events(self):
with self.lock:
heartbeats = self.incoming_heartbeats
partners = self.partners
self.incoming_heartbeats = []
self.partners = []
return heartbeats, partners
def is_loopback(ipv6):
return ipv6 == 15 * b"\0" + b"\x01"
def is_unspecified(ipv6):
return ipv6 == 16 * b"\0"
def is_mapped_ipv4(ipv6):
return ipv6[0:12] == 10 * b"\0" + 2 * b"\xff"
def is_local_unicast(ipv6):
return (ipv6[0] & 0xfe) == 0xfc
def is_valid_address(ipv6):
ipv6 = socket.inet_pton(socket.AF_INET6, ipv6)
return not is_loopback(ipv6) and \
not is_unspecified(ipv6) and \
not is_mapped_ipv4(ipv6) and \
not is_local_unicast(ipv6)
def describe(ipv6, port):
if port == DEFAULT_PORT:
return ipv6
return f"[{ipv6}]:{port}"
def log(msg):
time_info = time.strftime("%d.%m.%Y %H:%M:%S")
print(f"[{time_info}] {msg}")
def parse_address(addr_str):
if addr_str.startswith("["):
closing = addr_str.find("]:")
if closing == -1:
raise Exception(f"Not a valid address: {addr_str}")
ipv6 = addr_str[1:closing]
port = int(addr_str[closing+2:])
else:
ipv6 = addr_str
port = DEFAULT_PORT
return Peer(ipv6, port, 10)
def wait_until(target_time):
duration = target_time - time.time()
if duration > 0:
time.sleep(duration)
def empty_transaction_list_hash():
current_hash = 32 * b"\0"
for _ in range(1024):
entry = current_hash + 44 * b"\0"
current_hash = hashlib.sha256(entry).digest()
return current_hash
def send_heartbeat(node, peer):
protocol_version = 2 * b"\0"
capable_version = 2 * b"\0"
msg_type = b"\0"
difficulty_sum = 32 * b"\0"
hash_value = empty_transaction_list_hash()
if peer.partner is None:
partner_ipv6 = 16 * b"\0"
partner_port = 2 * b"\0"
else:
partner_ipv6 = socket.inet_pton(socket.AF_INET6, peer.partner.ipv6)
partner_port = peer.partner.port.to_bytes(2, "big")
heartbeat_msg = protocol_version + capable_version + msg_type + \
difficulty_sum + hash_value + partner_ipv6 + partner_port
node.node_socket.sendto(heartbeat_msg, (peer.ipv6, peer.port))
def define_partnership(peers):
peers_to_pair = [peer for peer in peers if peer.lifetime_counter >= 8]
random.shuffle(peers_to_pair)
pairing_count = len(peers_to_pair) // 2
for pair_idx in range(pairing_count):
peers_to_pair[2*pair_idx].partner = peers_to_pair[2*pair_idx+1]
peers_to_pair[2*pair_idx+1].partner = peers_to_pair[2*pair_idx]
# in case of an odd count, the last one will remain without partner
if pairing_count % 2 == 1:
peers_to_pair[-1].partner = None
def heartbeat(node):
while True:
heartbeats, partners = node.get_events()
heartbeats = set(heartbeats)
partners = set(partners)
peerlist_update = False
for peer in node.peers:
address = peer.get_address()
if address in heartbeats:
peer.lifetime_counter = 10
heartbeats.remove(address)
else:
peer.lifetime_counter -= 1
if peer.lifetime_counter < 0:
log(f"Removing {peer} from the list (no contact)")
peerlist_update = True
if address in partners:
partners.remove(address)
# cleaning up inactives
node.peers = [peer for peer in node.peers if peer.lifetime_counter >= 0]
# adding new peers that contacted me
for new_peer in (heartbeats - partners):
if not is_valid_address(new_peer[0]):
log(f"Ignoring this address: {new_peer[0]}")
continue
peer = Peer(new_peer[0], new_peer[1], 10)
log(f"Adding peer {peer} to the list (It connected on its own)")
node.peers.append(peer)
peerlist_update = True
# adding partners
for new_peer in partners:
if not is_valid_address(new_peer[0]):
log(f"Ignoring this address: {new_peer[0]}")
continue
peer = Peer(new_peer[0], new_peer[1], 3)
log(f"Adding peer {peer} to the list (Got it as \"partner\" from another peer)")
node.peers.append(peer)
peerlist_update = True
peer_count = len(node.peers)
if peerlist_update:
if peer_count == 1:
log(f"There is now 1 peer in my list.")
else:
log(f"There are now {peer_count} peers in my list.")
start_time = time.time()
define_partnership(node.peers)
for i, peer in enumerate(node.peers):
wait_until(start_time + 60 * (i+1) / peer_count)
send_heartbeat(node, peer)
if len(node.peers) == 0:
time.sleep(60)
def receiver(node):
while True:
msg, addr = node.node_socket.recvfrom(4096)
sender = describe(addr[0], addr[1])
msg_len = len(msg)
if msg_len < 4:
log("Got a udp message from {sender} that was too short.")
continue
version = int.from_bytes(msg[0:2], "big")
if version != 0:
log(f"Got a udp message of version {version} from {sender}, but only version 0 is supported.")
continue
if msg_len < 5:
log(f"Got a udp message from {sender} that was too short. (missing type field)")
continue
msg_type = msg[4]
if msg_type == 0:
# heartbeat
if msg_len != 87:
log(f"Got a heartbeat message of wrong length ({msg_len} bytes from {sender}, but expected 87 bytes)")
continue
# register heartbeat of the sender
node.add_heartbeat(addr[0:2])
partner_info = msg[69:87]
if partner_info != 18 * b"\0":
# register the partner that was sent along
partner_ip = socket.inet_ntop(socket.AF_INET6, partner_info[0:16])
partner_port = int.from_bytes(partner_info[16:18], "big")
node.add_partner((partner_ip, partner_port))
else:
log(f"Got a udp message of unknown type from {sender}. (type {msg_type})")
def main():
address_arguments = sys.argv[1:]
peers = [parse_address(argument) for argument in address_arguments]
node_socket = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
try:
node_socket.bind(("::", DEFAULT_PORT))
except OSError as e:
if e.errno == 98:
node_socket.bind(("::", 0))
port = node_socket.getsockname()[1]
log(f"Default port {DEFAULT_PORT} is in use, listening on port {port} instead.")
else:
raise e
log("Node is ready")
node = Node(node_socket, peers)
heartbeat_thread = threading.Thread(target = heartbeat, args = (node,))
receiving_thread = threading.Thread(target = receiver, args = (node,))
heartbeat_thread.start()
receiving_thread.start()
heartbeat_thread.join()
receiving_thread.join()
if __name__ == '__main__':
main()