from dataclasses import dataclass from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey, Ed25519PublicKey from cryptography.exceptions import InvalidSignature import hashlib import time @dataclass class Transaction: id: int sender: bytes receiver: bytes amount: int transaction_fee: int signature: bytes def is_valid(self): sender_pubkey = Ed25519PublicKey.from_public_bytes(self.sender) msg = self.id.to_bytes(4, "big") + \ self.sender + \ self.receiver + \ self.amount.to_bytes(8, "big") + \ self.transaction_fee(8, "big") try: sender_pubkey.verify(self.signature, msg) except InvalidSignature: return False return self.amount >= 1 def is_valid_after_block(self, block): if (self.sender, self.id) in block.used_transaction_ids: return False balance = block.balances.get(self.sender) if balance is None: return False return balance >= self.amount + self.transaction_fee @dataclass class FullBlock: nonce: int timestamp: int previous_hash: bytes message: bytes difficulty_sum: int miner_pubkey: bytes transaction: Transaction balances: dict # (sender_pubkey, id) tuples used_transaction_ids: set def is_valid(self, blockchain): if self.transaction is not None: if not self.transaction.is_valid(): return False if self.previous_hash != 32 * b"\0": prev_block = blockchain.get_block(self.previous_hash) if type(prev_block) is not FullBlock: return False if self.timestamp <= prev_block.timestamp: return False if self.timestamp > time.time(): return False if not self.transaction.is_valid_after_block(prev_block): return False elif self.transaction is not None: return False B_1_difficulty_sum, B_1_timestamp = self.get_difficulty_info(1, blockchain) B_10_difficulty_sum, B_10_timestamp = self.get_difficulty_info(10, blockchain) D = B_1_difficulty_sum - B_10_difficulty_sum T = self.timestamp - B_10_timestamp calculated_difficulty = D * 3000 // 9 // T block_difficulty = max(calculated_difficulty, 2**28) if B_1_difficulty_sum + block_difficulty != self.difficulty_sum: return False block_raw = self.get_block_raw() block_hash = hashlib.sha256(block_raw).digest() return int.from_bytes(block_hash, "big") * block_difficulty < 2**256 def get_difficulty_info(self, steps, blockchain): if steps == 0: return self.difficulty_sum, self.timestamp if self.previous_hash == 32 * b"\0": difficulty_sum = 2**29 - steps * 2**28 timestamp = self.timestamp - steps * 300 return difficulty_sum, timestamp else: previous_block = blockchain.get_block(self.previous_hash) return previous_block.get_difficulty_info(steps-1, blockchain) @dataclass class Datastructure: nonce:int timestamp: int previous_hash: bytes message: bytes difficulty_sum: int miner_pubkey: bytes transaction: int def get_block_raw(self): sender_pubkey_raw = Ed25519PublicKey.from_public_bytes(self.sender) msg_raw = self.nonce.to_bytes(8, "big") + \ self.timestamp.to_bytes(8, "big") + \ self.previous_hash + \ self.message + \ self.difficulty_sum.to_bytes(32, "big") + \ self.miner_pubkey +\ self.transaction.to_bytes(148, "big") try: sender_pubkey_raw.verify(self.previous_hash,msg_raw) except InvalidSignature: return False return self.transaction >=1 def is_valid_after_transaction(self.tansaction): if(self.transaction is None): return False class Blockchain: def __init__(self): # maps block hashes to block instances self.__block_map = {} def get_block(self, hash): return self.__block_map.get(hash)