Implement exchange of open transactions

This commit is contained in:
2024-03-20 22:23:26 +01:00
parent 0218e34787
commit c6394d2ca1
5 changed files with 233 additions and 8 deletions

93
node.py
View File

@@ -82,19 +82,12 @@ def wait_until(target_time):
if duration > 0:
time.sleep(duration)
def empty_transaction_list_hash():
current_hash = 32 * b"\0"
for _ in range(1024):
entry = current_hash + 44 * b"\0"
current_hash = hashlib.sha256(entry).digest()
return current_hash
def send_heartbeat(node, peer, b):
protocol_version = 2 * b"\0"
capable_version = 2 * b"\0"
msg_type = b"\0"
difficulty_sum = b.get_second_last_difficulty_sum().to_bytes(32, "big")
hash_value = empty_transaction_list_hash()
hash_value = b.open_transactions.get_hash(1023)
if peer.partner is None:
partner_ipv6 = 16 * b"\0"
@@ -223,6 +216,35 @@ def transfer_block(addr, node, receive_observer, b):
except NoReponseException:
pass
def compare_open_transactions(addr, node, receive_observer, b):
try:
cursor = 511
offset = 256
subscription = receive_observer.listen((addr[0:2], "transaction hash"))
while True:
request = b"\0\0\0\0\x03" + cursor.to_bytes(2, "big")
def list_hash_condition(response):
return response["position"] == cursor
list_hash = request_retry(node, addr, request, subscription, list_hash_condition)
if list_hash["hash"] == b.open_transactions.get_hash(cursor):
cursor += max(offset, 1)
else:
cursor -= offset
if offset == 0:
break
offset //= 2
subscription = receive_observer.listen((addr[0:2], "transaction"))
request = b"\0\0\0\0\x05" + cursor.to_bytes(2, "big")
def transaction_condition(response):
return response["position"] == cursor
remote_transaction = request_retry(node, addr, request, subscription, transaction_condition)
parsed_transaction = blockchain.Transaction.from_bytes(remote_transaction["transaction"])
if not parsed_transaction.is_valid():
return
b.open_transactions.add(parsed_transaction)
except NoReponseException:
pass
def receiver(node, b):
receive_observer = observer.Observer()
while True:
@@ -254,10 +276,14 @@ def receiver(node, b):
partner_port = int.from_bytes(partner_info[16:18], "big")
node.add_partner((partner_ip, partner_port))
contained_difficulty_sum = int.from_bytes(msg[5:37])
contained_transaction_hash = msg[37:69]
my_difficulty_sum = b.get_second_last_difficulty_sum()
if contained_difficulty_sum > my_difficulty_sum:
log("beginning a block transfer ...")
threading.Thread(target = transfer_block, args=(addr, node, receive_observer, b)).start()
elif contained_transaction_hash != b.open_transactions.get_hash(1023):
log("comparing open transactions ...")
threading.Thread(target = compare_open_transactions, args=(addr, node, receive_observer, b)).start()
elif msg_type == 1:
# block request
if msg_len != 37:
@@ -276,6 +302,7 @@ def receiver(node, b):
# block transfer
if msg_len != 297:
log(f"Got a block transfer of wrong length ({msg_len} bytes from {sender}, but expected 297 bytes)")
continue
block_raw = msg[5:297]
new_block = b.add_block(block_raw)
block_hash = new_block.own_hash
@@ -283,6 +310,56 @@ def receiver(node, b):
log("Got a new block")
identifier = (addr[0:2], "block transfer")
receive_observer.publish(identifier, block_hash)
elif msg_type == 3:
# open transaction list hash request
if msg_len != 7:
log(f"Got an open transaction list hash request of wrong length ({msg_len} bytes from {sender}, but expected 7 bytes)")
continue
list_position = int.from_bytes(msg[5:7], "big")
if list_position >= 1024:
log(f"Got an open transaction list hash request with invalid position (position {list_position} from {sender}, but must be below 1024)")
continue
list_hash = b.open_transactions.get_hash(list_position)
response = b"\0\0\0\0\x04" + list_position.to_bytes(2, "big") + list_hash
node.node_socket.sendto(response, addr)
elif msg_type == 4:
# open transaction list hash response
if msg_len != 39:
log(f"Got an open transaction list hash response of wrong length ({msg_len} bytes from {sender}, but expected 39 bytes)")
continue
event_obj = {
"position": int.from_bytes(msg[5:7], "big"),
"hash": msg[7:39],
}
identifier = (addr[0:2], "transaction hash")
receive_observer.publish(identifier, event_obj)
elif msg_type == 5:
# open transaction request
if msg_len != 7:
log(f"Got an open transaction request of wrong length ({msg_len} bytes from {sender}, but expected 7 bytes)")
continue
list_position = int.from_bytes(msg[5:7], "big")
if list_position >= 1024:
log(f"Got an open transaction request with invalid position (position {list_position} from {sender}, but must be below 1024)")
continue
transaction = b.open_transactions.get_transaction(list_position)
if transaction is None:
transaction_raw = 148 * b"\0"
else:
transaction_raw = transaction.get_transaction_raw()
response = b"\0\0\0\0\x06" + list_position.to_bytes(2, "big") + transaction_raw
node.node_socket.sendto(response, addr)
elif msg_type == 6:
# open transaction response
if msg_len != 155:
log(f"Got an open transaction list hash response of wrong length ({msg_len} bytes from {sender}, but expected 155 bytes)")
continue
event_obj = {
"position": int.from_bytes(msg[5:7], "big"),
"transaction": msg[7:155],
}
identifier = (addr[0:2], "transaction")
receive_observer.publish(identifier, event_obj)
else:
log(f"Got a udp message of unknown type from {sender}. (type {msg_type})")