Source code for zpywallet.wallet

# -*- coding: utf-8 -*-
"""
This module contains the methods for creating a crypto wallet.
"""

import json
import math
from os import urandom
from Cryptodome import Random
from typing import List

from zpywallet.utxo import UTXO

from .destination import Destination, FeePolicy

from .mnemonic import Mnemonic
from .utils.bip32 import HDWallet

from .utils.keys import PrivateKey, PublicKey
from .transactions.encode import create_transaction
from .transactions.decode import transaction_size_simple
from .broadcast import broadcast_transaction

from .generated import wallet_pb2

from .network import (
    BitcoinSegwitMainNet,
    BitcoinMainNet,
    BitcoinSegwitTestNet,
    BitcoinTestNet,
    LitecoinSegwitMainNet,
    LitecoinMainNet,
    LitecoinBTCSegwitMainNet,
    LitecoinBTCMainNet,
    LitecoinSegwitTestNet,
    LitecoinTestNet,
    EthereumMainNet,
    DogecoinMainNet,
    DogecoinBTCMainNet,
    DogecoinTestNet,
    DashMainNet,
    DashInvertedTestNet,
    DashBTCMainNet,
    DashTestNet,
    DashInvertedMainNet,
    BitcoinCashMainNet,
    BlockcypherTestNet,
)

from .address import CryptoClient

from .nodes.eth import eth_nodes

from .utils.aes import encrypt_str, decrypt_str

from .transaction import Transaction


[docs]def generate_mnemonic(strength=128): """Creates a new seed phrase of the specified length""" if strength % 32 != 0: raise ValueError("strength must be a multiple of 32") if strength < 128 or strength > 256: raise ValueError("strength should be >= 128 and <= 256") entropy = urandom(strength // 8) mne = Mnemonic(language="english") mnemonic = mne.to_mnemonic(entropy) return mnemonic
[docs]def create_wallet( mnemonic=None, network=BitcoinSegwitMainNet, strength=128 ) -> HDWallet: """Deprecated: This function will be removed in v1.0 Generate a new wallet class from a mnemonic phrase, optionally randomly generated Args: :param mnemonic: The key to use to generate this wallet. It may be a long string. Do not use a phrase from a book or song, as that will be guessed and is not secure. My advice is to not supply this argument and let me generate a new random key for you. :param network: The network to create this wallet for :param children: Create this many child addresses for this wallet. Default is 10, You should not use the master private key itself for sending or receiving funds, as a best practice. Return: HDWallet: a wallet class Usage: w = create_wallet(network='BTC', children=10) """ if mnemonic is None: return HDWallet.from_random(strength=strength, network=network) else: return HDWallet.from_mnemonic(mnemonic=mnemonic, network=network)
[docs]def create_keypair(network=BitcoinSegwitMainNet): """Generates a random private/public keypair. Args: :param network: The network to create this wallet for Return: PrivateKey, PublicKey: a tuple of a private key and public key. Usage: w = create_wallet(network='BTC', children=10) """ random_bytes = urandom(32) prv = PrivateKey(random_bytes, network=network) pub = prv.public_key return prv, pub
[docs]class Wallet: """Data class representing a cryptocurrency wallet."""
[docs] def __init__( self, network, seed_phrase, password, receive_gap_limit=1000, change_gap_limit=1000, derivation_path=None, _with_wallet=True, max_cycles=100, **kwargs, ): """ Initializes a Wallet object. Args: network: The network associated with the wallet. seed_phrase: The seed phrase for the wallet. password: The password to encrypt the wallet. receive_gap_limit (int, optional): The maximum gap limit for receive addresses. Defaults to 1000. change_gap_limit (int, optional): The maximum gap limit for change addresses. Defaults to 1000. derivation_path (str, optional): The derivation path for the wallet. Defaults to None. max_cycles (int, optional): The maximum number of cycles. Defaults to 100. fullnode_endpoints (list, optional): List of full node endpoints. Defaults to None. esplora_endpoints (list, optional): List of Esplora endpoints. Defaults to None. blockcypher_tokens (list, optional): List of Blockcypher tokens. Defaults to None. Raises: ValueError: If an unknown network is provided or if the derivation path is invalid. """ fullnode_endpoints = kwargs.get("fullnode_endpoints") esplora_endpoints = kwargs.get("esplora_endpoints") blockcypher_tokens = kwargs.get("blockcypher_tokens") self._network = network derivation_path = derivation_path or ( network.BIP32_SEGWIT_PATH or network.BIP32_PATH ) seed_phrase = seed_phrase or generate_mnemonic() if not _with_wallet: return self.container = wallet_pb2.Wallet() self.container.SerializeToString() self.container.receive_gap_limit = receive_gap_limit self.container.change_gap_limit = change_gap_limit self.container.height = 0 self.container.derivation_path = derivation_path if not isinstance(derivation_path, str): raise ValueError("Invalid derivation path") # Generate addresses and keys hdwallet = HDWallet.from_mnemonic(mnemonic=seed_phrase, network=network) # We do not save the password. Instead, we are going to # generate a base64-encrypted serialization of this wallet file # using the password. self.container.encrypted_seed_phrase = encrypt_str( seed_phrase, password ) # AES-256-SIV encryption # Set properties network_map = { BitcoinSegwitMainNet: wallet_pb2.BITCOIN_SEGWIT_MAINNET, BitcoinMainNet: wallet_pb2.BITCOIN_MAINNET, BitcoinSegwitTestNet: wallet_pb2.BITCOIN_SEGWIT_TESTNET, BitcoinTestNet: wallet_pb2.BITCOIN_TESTNET, LitecoinSegwitMainNet: wallet_pb2.LITECOIN_SEGWIT_MAINNET, LitecoinMainNet: wallet_pb2.LITECOIN_MAINNET, LitecoinBTCSegwitMainNet: wallet_pb2.LITECOIN_BTC_SEGWIT_MAINNET, LitecoinBTCMainNet: wallet_pb2.LITECOIN_BTC_MAINNET, LitecoinSegwitTestNet: wallet_pb2.LITECOIN_SEGWIT_TESTNET, LitecoinTestNet: wallet_pb2.LITECOIN_TESTNET, EthereumMainNet: wallet_pb2.ETHEREUM_MAINNET, DogecoinMainNet: wallet_pb2.DOGECOIN_MAINNET, DogecoinBTCMainNet: wallet_pb2.DOGECOIN_BTC_MAINNET, DogecoinTestNet: wallet_pb2.DOGECOIN_TESTNET, DashMainNet: wallet_pb2.DASH_MAINNET, DashInvertedMainNet: wallet_pb2.DASH_INVERTED_MAINNET, DashBTCMainNet: wallet_pb2.DASH_BTC_MAINNET, DashTestNet: wallet_pb2.DASH_TESTNET, DashInvertedTestNet: wallet_pb2.DASH_INVERTED_TESTNET, BitcoinCashMainNet: wallet_pb2.BITCOIN_CASH_MAINNET, BlockcypherTestNet: wallet_pb2.BLOCKCYPHER_TESTNET, } self.container.network = network_map.get(network) if self.container.network is None: raise ValueError("Unknown network") self.container.fullnode_endpoints.extend(fullnode_endpoints or []) self.container.esplora_endpoints.extend(esplora_endpoints or []) self.container.blockcypher_tokens.extend(blockcypher_tokens or []) self.encrypted_private_keys = [] for i in range(0, receive_gap_limit): privkey = hdwallet.get_child_for_path( f"{derivation_path}/0/{i}" ).private_key pubkey = privkey.public_key # Add an Address address = self.container.addresses.add() address.address = pubkey.address() address.pubkey = pubkey.to_hex() self.encrypted_private_keys.append( privkey.to_hex() if network.SUPPORTS_EVM else privkey.to_wif() ) self.encrypted_private_keys = encrypt_str( json.dumps(self.encrypted_private_keys), password ) self._setup_client(max_cycles=max_cycles)
[docs] @classmethod def deserialize(cls, data: bytes, password, max_cycles=100): """ Deserialize a Wallet object from its byte representation. Args: cls: The class object. data (bytes): The byte representation of the Wallet object. password: The password used to encrypt the wallet. max_cycles (int, optional): The maximum number of cycles. Defaults to 100. Returns: Wallet: The deserialized Wallet object. Raises: ValueError: If an unknown network is encountered during deserialization. """ wallet = wallet_pb2.Wallet() wallet.ParseFromString(data) seed_phrase = decrypt_str(wallet.encrypted_seed_phrase, password) network_map = { wallet_pb2.BITCOIN_SEGWIT_MAINNET: BitcoinSegwitMainNet, wallet_pb2.BITCOIN_MAINNET: BitcoinMainNet, wallet_pb2.BITCOIN_SEGWIT_TESTNET: BitcoinSegwitTestNet, wallet_pb2.BITCOIN_TESTNET: BitcoinTestNet, wallet_pb2.LITECOIN_SEGWIT_MAINNET: LitecoinSegwitMainNet, wallet_pb2.LITECOIN_MAINNET: LitecoinMainNet, wallet_pb2.LITECOIN_BTC_SEGWIT_MAINNET: LitecoinBTCSegwitMainNet, wallet_pb2.LITECOIN_BTC_MAINNET: LitecoinBTCMainNet, wallet_pb2.LITECOIN_SEGWIT_TESTNET: LitecoinSegwitTestNet, wallet_pb2.LITECOIN_TESTNET: LitecoinTestNet, wallet_pb2.ETHEREUM_MAINNET: EthereumMainNet, wallet_pb2.DOGECOIN_MAINNET: DogecoinMainNet, wallet_pb2.DOGECOIN_BTC_MAINNET: DogecoinBTCMainNet, wallet_pb2.DOGECOIN_TESTNET: DogecoinTestNet, wallet_pb2.DASH_MAINNET: DashMainNet, wallet_pb2.DASH_INVERTED_MAINNET: DashInvertedMainNet, wallet_pb2.DASH_BTC_MAINNET: DashBTCMainNet, wallet_pb2.DASH_TESTNET: DashTestNet, wallet_pb2.DASH_INVERTED_TESTNET: DashInvertedTestNet, wallet_pb2.BITCOIN_CASH_MAINNET: BitcoinCashMainNet, wallet_pb2.BLOCKCYPHER_TESTNET: BlockcypherTestNet, } network = network_map.get(wallet.network) if network is None: raise ValueError("Unknown network") self = cls(network, seed_phrase, password, _with_wallet=False) self.container = wallet hdwallet = HDWallet.from_mnemonic(mnemonic=seed_phrase, network=network) self.encrypted_private_keys = [] if not self.container.addresses: addresses = self.container.addresses else: addresses = None for i in range(0, self.container.receive_gap_limit): privkey = hdwallet.get_child_for_path( f"{self.container.derivation_path}/0/{i}" ).private_key self.encrypted_private_keys.append( privkey.to_hex() if network.SUPPORTS_EVM else privkey.to_wif() ) if addresses is not None: pubkey = privkey.public_key # Rebuild addresses only when the serialized wallet did not # contain them. address = addresses.add() address.address = pubkey.address() address.pubkey = pubkey.to_hex() self.encrypted_private_keys = encrypt_str( json.dumps(self.encrypted_private_keys), password ) del seed_phrase del password self._setup_client(max_cycles=max_cycles) return self
[docs] def network(self): """ Get the network associated with the wallet. Returns: CryptoNetwork: The network associated with the wallet. """ return self._network
def _setup_client(self, max_cycles=100): addresses = [a.address for a in self.container.addresses] fullnode_endpoints = [] esplora_endpoints = [] blockcypher_tokens = [] for node_pb2 in self.container.fullnode_endpoints: node = {} if node_pb2.url: node["url"] = node_pb2.url if node_pb2.user: node["user"] = node_pb2.user if node_pb2.password: node["password"] = node_pb2.password fullnode_endpoints.append(node) for node_pb2 in self.container.esplora_endpoints: node = {} if node_pb2.url: node["url"] = node_pb2.url esplora_endpoints.append(node) for token in self.container.blockcypher_tokens: blockcypher_tokens.append(token) use_database = False if self._network.SUPPORTS_EVM: use_database = True if not fullnode_endpoints and self._network.COIN == "ETH": fullnode_endpoints.extend(eth_nodes) kwargs = { "fullnode_endpoints": fullnode_endpoints, "esplora_endpoints": esplora_endpoints, "blockcypher_tokens": blockcypher_tokens, "use_database": use_database, } self.client = CryptoClient( addresses, coin=self._network.COIN, chain="test" if self._network.TESTNET else "main", transactions=self.container.transactions, max_cycles=max_cycles, **kwargs, )
[docs] def get_transaction_history(self): """ Get the transaction history associated with the wallet. Returns: List[Transaction]: The list of transactions in the wallet's history. """ transactions = self.client.get_transaction_history() # Create a set to keep track of unique txid values seen_txids = set() # List to store deduplicated transactions deduplicated_transactions = [] # Iterate through transactions for transaction in transactions: txid = transaction.txid # Check if txid is not seen before if txid not in seen_txids: # Add the transaction to the deduplicated list deduplicated_transactions.append(transaction) # Mark this txid as seen seen_txids.add(txid) # Update the transactions list with deduplicated transactions transactions = deduplicated_transactions del self.container.transactions[:] self.container.transactions.extend(transactions) tx_array = [] for t in transactions: tx_array.append(Transaction(t, self._network)) return tx_array
[docs] def get_utxos(self, only_unspent=False): """ Get the unspent transaction outputs (UTXOs) associated with the wallet. Args: only_unspent (bool, optional): If True, only unspent UTXOs are retrieved. Defaults to False. Returns: List[UTXO]: The list of unspent transaction outputs. """ addresses = [a.address for a in self.container.addresses] transactions = self.get_transaction_history() utxo_set = [] for t in transactions: for i in range(len(t.sat_outputs(only_unspent=only_unspent))): try: utxo_set.append( UTXO( t, i, addresses=addresses, other_transactions=transactions, only_mine=True, ) ) except ValueError: pass return utxo_set
def _to_human_friendly_utxo(self, inputs, private_keys): new_inputs = [] for ii in range(len(inputs)): u = inputs[ii] for i in range(len(private_keys)): private_key = private_keys[i] privkey = PrivateKey.from_wif(private_key.decode(), self._network) try: a = [ privkey.public_key.base58_address(True), privkey.public_key.base58_address(False), privkey.public_key.bech32_address(), ] except Exception: a = [ privkey.public_key.base58_address(True), privkey.public_key.base58_address(False), ] u._output["private_key"] = ( privkey if u._output["address"] in a else None ) u._output["address_hash"] = privkey.public_key.hash160() del private_key if u._output["private_key"] is None: continue new_inputs.append(u) break return new_inputs
[docs] def get_balance(self, in_standard_units=True): """ Get the balance of the wallet. Args: in_standard_units (bool, optional): If True, balance is returned in standard units (e.g., BTC, ETH). If False, balance is returned in raw units (e.g., satoshi, wei). Defaults to True. Returns: Tuple[float, float]: The total balance and confirmed balance of the wallet. """ if self._network.SUPPORTS_EVM: # We must use the Web3 network to get the balance as UTXOs are not # available and getting transaction history of an address is # impractically slow. balance = self.client.get_balance() if in_standard_units: return balance[0] / 1e18, balance[1] / 1e18 else: return balance # Not an EVM chain total_balance = 0 confirmed_balance = 0 utxos = self.get_utxos(only_unspent=True) for u in utxos: confirmed_balance += u.amount(in_standard_units=in_standard_units) utxos = self.get_utxos() for u in utxos: total_balance += u.amount(in_standard_units=in_standard_units) return total_balance, confirmed_balance
[docs] def addresses(self): """ Get the addresses associated with the wallet. Returns: List[str]: The list of addresses associated with the wallet. """ return [a.address for a in self.container.addresses]
[docs] def random_address(self): """ Get a random address from the wallet. Returns: str: A randomly selected address from the wallet. """ addresses = self.addresses() # Use a secure RNG to resist blockchain analysis limit = len(addresses) # Convert bits to bytes and round up to the nearest byte watermark = int(math.ceil(math.log2(len(addresses)))) // 8 while limit >= len(addresses): limit = int.from_bytes(Random.new().read(watermark), byteorder="big") return addresses[limit]
[docs] def private_keys(self, password): """ Get the private keys associated with the wallet. Args: password: The password used to encrypt the wallet. Returns: List[str]: The list of private keys associated with the wallet. """ private_keys = [] try: private_keys = json.loads( decrypt_str(self.encrypted_private_keys, password) ) return private_keys except ValueError as e: del private_keys raise e
def _add_stock_nodes(self): fullnode_endpoints = [] for f in self.container.fullnode_endpoints: _f = {} _f["url"] = f.url fullnode_endpoints.append(_f) if not self._network.SUPPORTS_EVM: return fullnode_endpoints else: if self._network.COIN == "ETH": fullnode_endpoints.extend(eth_nodes) return fullnode_endpoints def _calculate_change(self, inputs, destinations, fee_rate): temp_transaction = create_transaction( inputs, destinations, network=self._network ) size = transaction_size_simple(temp_transaction) total_inputs = sum([i.amount(in_standard_units=False) for i in inputs]) total_outputs = sum([o.amount(in_standard_units=False) for o in destinations]) fee_proportional_outputs = [ o for o in destinations if o.fee_policy() == FeePolicy.PROPORTIONAL ] if total_inputs < total_outputs + size * fee_rate: if fee_proportional_outputs: proportional_fee = int( math.ceil((size * fee_rate) / len(fee_proportional_outputs)) ) old_destinations = destinations destinations = [] for o in old_destinations: if o.fee_policy() == FeePolicy.PROPORTIONAL: o._amount -= proportional_fee destinations.append(o) else: raise ValueError("Not enough balance for this transaction") # If after applying proportional fee scaling we STILL don't have # enough balance, then that means the outputs are greater than the # inputs (possibly a dust input set). In this case, the total_outputs # is most likely negative. total_outputs = sum([o.amount(in_standard_units=False) for o in destinations]) if total_inputs < total_outputs + size * fee_rate: raise ValueError( "Not enough balance for this transaction " "(are you trying to send dust amounts?)" ) change = total_inputs - total_outputs - size * fee_rate return ( None if change <= 0 else Destination(self.random_address(), change / 1e8, self._network) ) # Fee rate is in the unit used by the network, ie. vbytes, bytes or wei
[docs] def create_transaction( self, password: bytes, destinations: List[Destination], fee_rate=None, spend_unconfirmed_inputs=False, **kwargs, ): """ Create a transaction. Args: password (bytes): The password used to encrypt the wallet. destinations (List[Destination]): The list of destinations for the transaction. fee_rate: The fee rate for the transaction. spend_unconfirmed_inputs (bool, optional): If True, spend unconfirmed inputs. Defaults to False. Returns: Transaction: The created transaction. """ fullnode_endpoints = self._add_stock_nodes() if self._network.SUPPORTS_EVM: if not destinations: raise ValueError("Must specify at least one destination") sender_address = kwargs.pop("from_address", None) or self.addresses()[0] addresses = self.addresses() try: sender_index = addresses.index(sender_address) except ValueError as e: raise ValueError("from_address does not belong to this wallet") from e private_key = self.private_keys(password)[sender_index] pseudo_input = UTXO( None, None, _internal_param_do_not_use={ "address": sender_address, "private_key": private_key, "amount": 0, "height": 0, }, _network=self._network, ) return create_transaction( [pseudo_input], destinations, network=self._network, full_nodes=fullnode_endpoints, **kwargs, ) inputs = self.get_utxos(only_unspent=True) if not spend_unconfirmed_inputs: confirmed_inputs = [] for i in inputs: if i.height(): confirmed_inputs.append(i) inputs = confirmed_inputs private_keys = self.private_keys(password) inputs = self._to_human_friendly_utxo(inputs, private_keys) # Depending on the size of the transactions, we may need to add a # change output. Otherwise, the remaining balance is going to the miner. # This is not the real change input, we need to find the size of the # transaction first. change = Destination(self.random_address(), 0, self._network) destinations_without_change = destinations[:] destinations_without_change.append(change) change = self._calculate_change(inputs, destinations_without_change, fee_rate) if change: destinations.append(change) return create_transaction(inputs, destinations, network=self._network)
[docs] def broadcast_transaction(self, transaction): """ Broadcast a transaction to the network. Args: transaction: The transaction to broadcast. """ broadcast_transaction(transaction, self._network)
[docs] def serialize(self): """ Serialize the wallet object. Returns: bytes: The serialized representation of the wallet. """ return self.container.SerializeToString()