437 lines
18 KiB
Python
Executable File
437 lines
18 KiB
Python
Executable File
#! /usr/bin/env python3
|
|
|
|
import blockchain, hashlib, observer, random, socket, sys, threading, time
|
|
from _queue import Empty
|
|
|
|
DEFAULT_PORT = 62039
|
|
|
|
class Peer:
|
|
def __init__(self, ipv6, port, lifetime_counter):
|
|
self.ipv6 = ipv6
|
|
self.port = port
|
|
self.lifetime_counter = lifetime_counter
|
|
self.partner = None
|
|
def get_address(self):
|
|
return self.ipv6, self.port
|
|
def __str__(self):
|
|
return describe(self.ipv6, self.port)
|
|
|
|
class Node:
|
|
def __init__(self, node_socket, peers):
|
|
self.lock = threading.Lock()
|
|
self.node_socket = node_socket
|
|
self.peers = peers
|
|
self.incoming_heartbeats = []
|
|
self.partners = []
|
|
def add_heartbeat(self, address):
|
|
with self.lock:
|
|
self.incoming_heartbeats.append(address)
|
|
def add_partner(self, address):
|
|
with self.lock:
|
|
self.partners.append(address)
|
|
def get_events(self):
|
|
with self.lock:
|
|
heartbeats = self.incoming_heartbeats
|
|
partners = self.partners
|
|
self.incoming_heartbeats = []
|
|
self.partners = []
|
|
return heartbeats, partners
|
|
|
|
def is_loopback(ipv6):
|
|
return ipv6 == 15 * b"\0" + b"\x01"
|
|
|
|
def is_unspecified(ipv6):
|
|
return ipv6 == 16 * b"\0"
|
|
|
|
def is_mapped_ipv4(ipv6):
|
|
return ipv6[0:12] == 10 * b"\0" + 2 * b"\xff"
|
|
|
|
def is_local_unicast(ipv6):
|
|
return (ipv6[0] & 0xfe) == 0xfc
|
|
|
|
def is_valid_address(ipv6):
|
|
ipv6 = socket.inet_pton(socket.AF_INET6, ipv6)
|
|
return not is_loopback(ipv6) and \
|
|
not is_unspecified(ipv6) and \
|
|
not is_mapped_ipv4(ipv6) and \
|
|
not is_local_unicast(ipv6)
|
|
|
|
def describe(ipv6, port):
|
|
if port == DEFAULT_PORT:
|
|
return ipv6
|
|
return f"[{ipv6}]:{port}"
|
|
|
|
def log(msg):
|
|
time_info = time.strftime("%d.%m.%Y %H:%M:%S")
|
|
print(f"[{time_info}] {msg}")
|
|
|
|
def parse_address(addr_str):
|
|
if addr_str.startswith("["):
|
|
closing = addr_str.find("]:")
|
|
if closing == -1:
|
|
raise Exception(f"Not a valid address: {addr_str}")
|
|
ipv6 = addr_str[1:closing]
|
|
port = int(addr_str[closing+2:])
|
|
else:
|
|
ipv6 = addr_str
|
|
port = DEFAULT_PORT
|
|
return Peer(ipv6, port, 10)
|
|
|
|
def wait_until(target_time):
|
|
duration = target_time - time.time()
|
|
if duration > 0:
|
|
time.sleep(duration)
|
|
|
|
def send_heartbeat(node, peer, b):
|
|
protocol_version = 2 * b"\0"
|
|
capable_version = 2 * b"\0"
|
|
msg_type = b"\0"
|
|
difficulty_sum = b.get_second_last_difficulty_sum().to_bytes(32, "big")
|
|
hash_value = b.open_transactions.get_hash(1023)
|
|
|
|
if peer.partner is None:
|
|
partner_ipv6 = 16 * b"\0"
|
|
partner_port = 2 * b"\0"
|
|
else:
|
|
partner_ipv6 = socket.inet_pton(socket.AF_INET6, peer.partner.ipv6)
|
|
partner_port = peer.partner.port.to_bytes(2, "big")
|
|
|
|
heartbeat_msg = protocol_version + capable_version + msg_type + \
|
|
difficulty_sum + hash_value + partner_ipv6 + partner_port
|
|
node.node_socket.sendto(heartbeat_msg, (peer.ipv6, peer.port))
|
|
|
|
def define_partnership(peers):
|
|
peers_to_pair = [peer for peer in peers if peer.lifetime_counter >= 8]
|
|
random.shuffle(peers_to_pair)
|
|
pairing_count = len(peers_to_pair) // 2
|
|
for pair_idx in range(pairing_count):
|
|
peers_to_pair[2*pair_idx].partner = peers_to_pair[2*pair_idx+1]
|
|
peers_to_pair[2*pair_idx+1].partner = peers_to_pair[2*pair_idx]
|
|
# in case of an odd count, the last one will remain without partner
|
|
if pairing_count % 2 == 1:
|
|
peers_to_pair[-1].partner = None
|
|
|
|
def heartbeat(node, b):
|
|
while True:
|
|
heartbeats, partners = node.get_events()
|
|
heartbeats = set(heartbeats)
|
|
partners = set(partners)
|
|
peerlist_update = False
|
|
for peer in node.peers:
|
|
address = peer.get_address()
|
|
if address in heartbeats:
|
|
peer.lifetime_counter = 10
|
|
heartbeats.remove(address)
|
|
else:
|
|
peer.lifetime_counter -= 1
|
|
if peer.lifetime_counter < 0:
|
|
log(f"Removing {peer} from the list (no contact)")
|
|
peerlist_update = True
|
|
if address in partners:
|
|
partners.remove(address)
|
|
|
|
# cleaning up inactives
|
|
node.peers = [peer for peer in node.peers if peer.lifetime_counter >= 0]
|
|
|
|
# adding new peers that contacted me
|
|
for new_peer in (heartbeats - partners):
|
|
if not is_valid_address(new_peer[0]):
|
|
log(f"Ignoring this address: {new_peer[0]}")
|
|
continue
|
|
peer = Peer(new_peer[0], new_peer[1], 10)
|
|
log(f"Adding peer {peer} to the list (It connected on its own)")
|
|
node.peers.append(peer)
|
|
peerlist_update = True
|
|
|
|
# adding partners
|
|
for new_peer in partners:
|
|
if not is_valid_address(new_peer[0]):
|
|
log(f"Ignoring this address: {new_peer[0]}")
|
|
continue
|
|
peer = Peer(new_peer[0], new_peer[1], 3)
|
|
log(f"Adding peer {peer} to the list (Got it as \"partner\" from another peer)")
|
|
node.peers.append(peer)
|
|
peerlist_update = True
|
|
|
|
peer_count = len(node.peers)
|
|
if peerlist_update:
|
|
if peer_count == 1:
|
|
log(f"There is now 1 peer in my list.")
|
|
else:
|
|
log(f"There are now {peer_count} peers in my list.")
|
|
start_time = time.time()
|
|
define_partnership(node.peers)
|
|
for i, peer in enumerate(node.peers):
|
|
wait_until(start_time + 60 * (i+1) / peer_count)
|
|
send_heartbeat(node, peer, b)
|
|
if len(node.peers) == 0:
|
|
time.sleep(60)
|
|
|
|
class NoReponseException(Exception):
|
|
pass
|
|
|
|
def request_retry(node, addr, request, subscription, condition):
|
|
for _ in range(10):
|
|
node.node_socket.sendto(request, addr)
|
|
try:
|
|
while True:
|
|
response = subscription.receive(1)
|
|
if condition(response):
|
|
break
|
|
return response
|
|
except Empty:
|
|
pass
|
|
raise NoReponseException()
|
|
|
|
def transfer_block(addr, node, receive_observer, b):
|
|
try:
|
|
block_list = []
|
|
request_block_hash = 32 * b"\0"
|
|
subscription = receive_observer.listen((addr[0:2], "block transfer"))
|
|
while True:
|
|
if request_block_hash != 32 * b"\0":
|
|
existing_block = b.get_block(request_block_hash)
|
|
if existing_block is not None:
|
|
if existing_block.valid:
|
|
break
|
|
request_block_hash = existing_block.previous_hash
|
|
if request_block_hash == 32*b"\0":
|
|
break
|
|
continue
|
|
request = b"\0\0\0\0\x01" + request_block_hash
|
|
def condition(response_hash):
|
|
if request_block_hash == 32*b"\0":
|
|
return True
|
|
return response_hash == request_block_hash
|
|
block_hash = request_retry(node, addr, request, subscription, condition)
|
|
block_list.append(block_hash)
|
|
request_block_hash = b.get_block(block_hash).previous_hash
|
|
if request_block_hash == 32*b"\0":
|
|
break
|
|
for block_hash in reversed(block_list):
|
|
if not b.get_block(block_hash).validate(b):
|
|
return
|
|
if b.set_latest_block(block_hash):
|
|
log("Got a new block")
|
|
except NoReponseException:
|
|
pass
|
|
|
|
def compare_open_transactions(addr, node, receive_observer, b):
|
|
try:
|
|
cursor = 511
|
|
offset = 256
|
|
subscription = receive_observer.listen((addr[0:2], "transaction hash"))
|
|
while True:
|
|
request = b"\0\0\0\0\x03" + cursor.to_bytes(2, "big")
|
|
def list_hash_condition(response):
|
|
return response["position"] == cursor
|
|
list_hash = request_retry(node, addr, request, subscription, list_hash_condition)
|
|
if list_hash["hash"] == b.open_transactions.get_hash(cursor):
|
|
cursor += max(offset, 1)
|
|
else:
|
|
cursor -= offset
|
|
if offset == 0:
|
|
break
|
|
offset //= 2
|
|
subscription = receive_observer.listen((addr[0:2], "transaction"))
|
|
request = b"\0\0\0\0\x05" + cursor.to_bytes(2, "big")
|
|
def transaction_condition(response):
|
|
return response["position"] == cursor
|
|
remote_transaction = request_retry(node, addr, request, subscription, transaction_condition)
|
|
parsed_transaction = blockchain.Transaction.from_bytes(remote_transaction["transaction"])
|
|
if not parsed_transaction.is_valid():
|
|
return
|
|
b.open_transactions.add(parsed_transaction)
|
|
except NoReponseException:
|
|
pass
|
|
|
|
def receiver(node, b):
|
|
receive_observer = observer.Observer()
|
|
while True:
|
|
msg, addr = node.node_socket.recvfrom(4096)
|
|
sender = describe(addr[0], addr[1])
|
|
msg_len = len(msg)
|
|
if msg_len < 4:
|
|
log("Got a udp message from {sender} that was too short.")
|
|
continue
|
|
version = int.from_bytes(msg[0:2], "big")
|
|
if version != 0:
|
|
log(f"Got a udp message of version {version} from {sender}, but only version 0 is supported.")
|
|
continue
|
|
if msg_len < 5:
|
|
log(f"Got a udp message from {sender} that was too short. (missing type field)")
|
|
continue
|
|
msg_type = msg[4]
|
|
if msg_type == 0:
|
|
# heartbeat
|
|
if msg_len != 87:
|
|
log(f"Got a heartbeat message of wrong length ({msg_len} bytes from {sender}, but expected 87 bytes)")
|
|
continue
|
|
# register heartbeat of the sender
|
|
node.add_heartbeat(addr[0:2])
|
|
partner_info = msg[69:87]
|
|
if partner_info != 18 * b"\0":
|
|
# register the partner that was sent along
|
|
partner_ip = socket.inet_ntop(socket.AF_INET6, partner_info[0:16])
|
|
partner_port = int.from_bytes(partner_info[16:18], "big")
|
|
node.add_partner((partner_ip, partner_port))
|
|
contained_difficulty_sum = int.from_bytes(msg[5:37])
|
|
contained_transaction_hash = msg[37:69]
|
|
my_difficulty_sum = b.get_second_last_difficulty_sum()
|
|
if contained_difficulty_sum > my_difficulty_sum:
|
|
log("beginning a block transfer ...")
|
|
threading.Thread(target = transfer_block, args=(addr, node, receive_observer, b)).start()
|
|
elif contained_transaction_hash != b.open_transactions.get_hash(1023):
|
|
log("comparing open transactions ...")
|
|
threading.Thread(target = compare_open_transactions, args=(addr, node, receive_observer, b)).start()
|
|
elif msg_type == 1:
|
|
# block request
|
|
if msg_len != 37:
|
|
log(f"Got a block request of wrong length ({msg_len} bytes from {sender}, but expected 37 bytes)")
|
|
block_hash = msg[5:37]
|
|
if block_hash == 32 * b"\0":
|
|
block_to_send = b.get_latest_block()
|
|
else:
|
|
block_to_send = b.get_block(block_hash)
|
|
if block_to_send is None:
|
|
continue
|
|
block_raw = block_to_send.get_block_raw()
|
|
response_msg = b"\0\0\0\0\x02" + block_raw
|
|
node.node_socket.sendto(response_msg, addr)
|
|
elif msg_type == 2:
|
|
# block transfer
|
|
if msg_len != 297:
|
|
log(f"Got a block transfer of wrong length ({msg_len} bytes from {sender}, but expected 297 bytes)")
|
|
continue
|
|
block_raw = msg[5:297]
|
|
new_block = b.add_block(block_raw)
|
|
block_hash = new_block.own_hash
|
|
if new_block.validate(b) and b.set_latest_block(block_hash):
|
|
log("Got a new block")
|
|
identifier = (addr[0:2], "block transfer")
|
|
receive_observer.publish(identifier, block_hash)
|
|
elif msg_type == 3:
|
|
# open transaction list hash request
|
|
if msg_len != 7:
|
|
log(f"Got an open transaction list hash request of wrong length ({msg_len} bytes from {sender}, but expected 7 bytes)")
|
|
continue
|
|
list_position = int.from_bytes(msg[5:7], "big")
|
|
if list_position >= 1024:
|
|
log(f"Got an open transaction list hash request with invalid position (position {list_position} from {sender}, but must be below 1024)")
|
|
continue
|
|
list_hash = b.open_transactions.get_hash(list_position)
|
|
response = b"\0\0\0\0\x04" + list_position.to_bytes(2, "big") + list_hash
|
|
node.node_socket.sendto(response, addr)
|
|
elif msg_type == 4:
|
|
# open transaction list hash response
|
|
if msg_len != 39:
|
|
log(f"Got an open transaction list hash response of wrong length ({msg_len} bytes from {sender}, but expected 39 bytes)")
|
|
continue
|
|
event_obj = {
|
|
"position": int.from_bytes(msg[5:7], "big"),
|
|
"hash": msg[7:39],
|
|
}
|
|
identifier = (addr[0:2], "transaction hash")
|
|
receive_observer.publish(identifier, event_obj)
|
|
elif msg_type == 5:
|
|
# open transaction request
|
|
if msg_len != 7:
|
|
log(f"Got an open transaction request of wrong length ({msg_len} bytes from {sender}, but expected 7 bytes)")
|
|
continue
|
|
list_position = int.from_bytes(msg[5:7], "big")
|
|
if list_position >= 1024:
|
|
log(f"Got an open transaction request with invalid position (position {list_position} from {sender}, but must be below 1024)")
|
|
continue
|
|
transaction = b.open_transactions.get_transaction(list_position)
|
|
if transaction is None:
|
|
transaction_raw = 148 * b"\0"
|
|
else:
|
|
transaction_raw = transaction.get_transaction_raw()
|
|
response = b"\0\0\0\0\x06" + list_position.to_bytes(2, "big") + transaction_raw
|
|
node.node_socket.sendto(response, addr)
|
|
elif msg_type == 6:
|
|
# open transaction response
|
|
if msg_len != 155:
|
|
log(f"Got an open transaction list hash response of wrong length ({msg_len} bytes from {sender}, but expected 155 bytes)")
|
|
continue
|
|
event_obj = {
|
|
"position": int.from_bytes(msg[5:7], "big"),
|
|
"transaction": msg[7:155],
|
|
}
|
|
identifier = (addr[0:2], "transaction")
|
|
receive_observer.publish(identifier, event_obj)
|
|
elif msg_type == 7:
|
|
# mining task request
|
|
if msg_len != 257:
|
|
log(f"Got a mining task request of wrong length ({msg_len} bytes from {sender}, but expected 257 bytes)")
|
|
continue
|
|
transaction = b.open_transactions.get_transaction(0)
|
|
if transaction is not None:
|
|
transaction_raw = transaction.get_transaction_raw()
|
|
else:
|
|
transaction_raw = 148 * b"\0"
|
|
t = int(time.time())
|
|
timestamp_raw = t.to_bytes(8, "big")
|
|
latest_block = b.get_latest_block()
|
|
if latest_block is not None:
|
|
B_1_difficulty_sum, _ = latest_block.get_difficulty_info(0, b)
|
|
B_10_difficulty_sum, B_10_timestamp = latest_block.get_difficulty_info(9, b)
|
|
D = B_1_difficulty_sum - B_10_difficulty_sum
|
|
T = t - B_10_timestamp
|
|
calculated_difficulty = D * 3000 // 9 // T
|
|
block_difficulty = max(calculated_difficulty, 2**28)
|
|
difficulty_sum = B_1_difficulty_sum + block_difficulty
|
|
previous_hash = latest_block.own_hash
|
|
else:
|
|
block_difficulty = 2**28
|
|
difficulty_sum = 2**29
|
|
previous_hash = 32 * b"\0"
|
|
threshold = (2**256 - 1) // block_difficulty
|
|
response = b"\0\0\0\0\x08" + \
|
|
transaction_raw + \
|
|
previous_hash + \
|
|
timestamp_raw + \
|
|
difficulty_sum.to_bytes(32, "big") + \
|
|
threshold.to_bytes(32, "big")
|
|
node.node_socket.sendto(response, addr)
|
|
elif msg_type == 9:
|
|
# payment request
|
|
if msg_len != 153:
|
|
log(f"Got a payment of wrong length ({msg_len} bytes from {sender}, but expected 153 bytes)")
|
|
continue
|
|
parsed_transaction = blockchain.Transaction.from_bytes(msg[5:153])
|
|
if not parsed_transaction.is_valid():
|
|
continue
|
|
b.open_transactions.add(parsed_transaction)
|
|
else:
|
|
log(f"Got a udp message of unknown type from {sender}. (type {msg_type})")
|
|
|
|
def main():
|
|
address_arguments = sys.argv[1:]
|
|
peers = [parse_address(argument) for argument in address_arguments]
|
|
|
|
node_socket = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
|
|
try:
|
|
node_socket.bind(("::", DEFAULT_PORT))
|
|
except OSError as e:
|
|
if e.errno == 98:
|
|
node_socket.bind(("::", 0))
|
|
port = node_socket.getsockname()[1]
|
|
log(f"Default port {DEFAULT_PORT} is in use, listening on port {port} instead.")
|
|
else:
|
|
raise e
|
|
|
|
log("Node is ready")
|
|
|
|
node = Node(node_socket, peers)
|
|
b = blockchain.Blockchain()
|
|
heartbeat_thread = threading.Thread(target = heartbeat, args = (node, b))
|
|
receiving_thread = threading.Thread(target = receiver, args = (node, b))
|
|
heartbeat_thread.start()
|
|
receiving_thread.start()
|
|
heartbeat_thread.join()
|
|
receiving_thread.join()
|
|
|
|
if __name__ == '__main__':
|
|
main()
|