import binascii
import hashlib
import web3
from web3.gas_strategies.time_based import fast_gas_price_strategy
from ..network import BitcoinSegwitMainNet
from ..utils.keys import PrivateKey
from ..utxo import UTXO
from ..destination import Destination
from ..utils.base58 import b58decode_check, is_b58check
from ..utils.keccak import to_checksum_address
# Keep the typing import until the module is migrated to builtin collection annotations consistently.
from typing import List
SIGHASH_ALL = 1
[docs]def script_is_p2pkh(script):
return (
len(script) == 25
and script[0:3] == b"\x76\xa9\x14"
and script[23:25] == b"\x88\xac"
)
[docs]def script_is_p2sh(script):
return len(script) == 23 and script[0:2] == b"\xa9\x14" and script[22] == 0x87
[docs]def script_is_p2wpkh(script):
return len(script) == 22 and script[0:2] == b"\x00\x14"
[docs]def script_is_p2wsh(script):
return len(script) == 34 and script[0:2] == b"\x00\x20"
[docs]def int_to_hex(i, min_bytes=1):
return i.to_bytes(max(min_bytes, (i.bit_length() + 7) // 8), byteorder="little")
[docs]def create_varint(value):
if value < 0xFD:
return int_to_hex(value, min_bytes=1)
elif value <= 0xFFFF:
return b"\xfd" + int_to_hex(value, min_bytes=2)
elif value <= 0xFFFFFFFF:
return b"\xfe" + int_to_hex(value, min_bytes=4)
else:
return b"\xff" + int_to_hex(value, min_bytes=8)
[docs]def assemble_legacy_signature(
bytes_1,
bytes_2_inputs,
bytes_3,
bytes_4,
network,
b2i,
):
# Remove segwit signalling bytes if present
if bytes_1[-3:-1] == b"\x00\x01":
bytes_1 = bytes_1[:-3] + bytes_1[-1:]
b2 = bytes_2_inputs[b2i]
script_pubkey = b2[3]
private_key = b2[4]
sighash = b2[5]
keyhash = b2[6]
partial_transaction = bytes_1
for i in range(0, len(bytes_2_inputs)):
partial_transaction += bytes_2_inputs[i][0]
if i == b2i:
partial_transaction += create_varint(len(script_pubkey)) + script_pubkey
else:
partial_transaction += bytes_2_inputs[i][1] # The empty scriptsig
partial_transaction += bytes_2_inputs[i][2]
partial_transaction += bytes_3
partial_transaction += bytes_4
# And last, the input's sighash must be placed AT THE END of the temporary transaction
partial_transaction += int_to_hex(sighash, 4)
# We are actually supposed to hash the partial transaction twice.
# However, coincurve ALWAYS hashes the message before signing, and if we disable the
# hasher then it throws a tantrum.
# So we only hash it one time here.
hashed_preimage = hashlib.sha256(partial_transaction).digest()
# Sign it
pubkey = private_key.public_key.to_bytes()
if private_key.public_key.hash160(compressed=False) == keyhash:
pubkey = private_key.public_key.to_bytes(False)
der = private_key.der_sign(hashed_preimage) + bytes([sighash])
# I would like to mention that this only works if the data being pushed is less than 76 bytes.
# Otherwise we need to use OP_PUSHDATA<1/2/4>
if network.ADDRESS_MODE:
script = int_to_hex(len(der)) + der + int_to_hex(len(pubkey)) + pubkey
else:
# P2PK has no pubkey
script = int_to_hex(len(der)) + der
return create_varint(len(script)) + script
[docs]def assemble_segwit_payload(
i, inputs, nsequence, outputs, nlocktime="00000000", sighash=SIGHASH_ALL
):
# nVersion of the transaction (4-byte little endian)
segwit_payload = int_to_hex(1, 4)
# hash_prevouts (32-byte hash)
hash_prevouts = b""
for j in inputs:
hash_prevouts += binascii.unhexlify(j.txid().encode())[::-1]
hash_prevouts += int_to_hex(j.index(), 4)
segwit_payload += hashlib.sha256(hashlib.sha256(hash_prevouts).digest()).digest()
# hash_sequence (32-byte hash)
hash_sequence = b""
for j in inputs:
nsequence = j._nsequence()
if isinstance(nsequence, str):
nsequence = bytes.fromhex(nsequence)
hash_sequence += nsequence
segwit_payload += hashlib.sha256(hashlib.sha256(hash_sequence).digest()).digest()
# outpoint (32-byte hash + 4-byte little endian)
segwit_payload += binascii.unhexlify(i.txid().encode())[::-1]
segwit_payload += int_to_hex(i.index(), 4)
# scriptCode of the input (serialized as scripts inside CTxOuts)
# note: for p2wpkh this is actually the P2PKH script!!!
# for p2wsh it is the original script
if i._private_key().public_key.hashonly:
script = i._private_key().public_key.script() # p2wsh
else:
script = i._private_key().public_key.p2pkh_script()
segwit_payload += create_varint(len(script)) + script
# value of the output spent by this input (8-byte little endian)
segwit_payload += int_to_hex(i.amount(in_standard_units=False), 8)
# nSequence of the input (4-byte little endian)
segwit_payload += nsequence
# hashOutputs (32-byte hash)
segwit_payload += hashlib.sha256(hashlib.sha256(outputs).digest()).digest()
# nLocktime of the transaction (4-byte little endian)
segwit_payload += bytes.fromhex(nlocktime)
# sighash type of the signature (4-byte little endian)
segwit_payload += int_to_hex(sighash, 4)
return segwit_payload
[docs]def create_signatures_legacy(bytes_1, bytes_2_inputs, bytes_3, bytes_4):
# Signs the inputs of a legacy transaction. The parts are contained in bytes 1, 2, 3, 4.
# Bytes 2 contains the inputs broken up so that the signature is isolated. It also has
# the script pubkey, the private key, and sighash.
# Note that Segwit transactions use a different signing format (see BIP 143).
signatures = []
# Note that only ONE INPUT IS FILLED AT A TIME DURING SIGNING
for b2i in range(0, len(bytes_2_inputs)):
b2 = bytes_2_inputs[b2i]
signatures.append(
assemble_legacy_signature(
bytes_1, bytes_2_inputs, bytes_3, bytes_4, b2[8], b2i
)
)
# Now that we have all the signatures, we can assemble the signed transaction
signed_transaction = bytes_1
for i in range(0, len(bytes_2_inputs)):
signed_transaction += bytes_2_inputs[i][0]
signed_transaction += signatures[i]
signed_transaction += bytes_2_inputs[i][2]
signed_transaction += bytes_3
signed_transaction += bytes_4
return signed_transaction.hex()
[docs]def create_signatures_segwit(bytes_1, bytes_2_inputs, bytes_3, bytes_4):
# Signs the inputs of a segwit transaction. The parts are contained in bytes 1, 2, 3, 4.
# Bytes 2 contains the inputs broken up so that the signature is isolated. It also has
# the script pubkey, the private key, and sighash.
#
# The partial transaction to sign is a double SHA256 of the serialization of:
# 1. nVersion of the transaction (4-byte little endian)
# 2. hash_prevouts (32-byte hash)
# 3. hash_sequence (32-byte hash)
# 4. outpoint (32-byte hash + 4-byte little endian)
# 5. scriptCode of the input (serialized as scripts inside CTxOuts)
# 6. value of the output spent by this input (8-byte little endian)
# 7. nSequence of the input (4-byte little endian)
# 8. hashOutputs (32-byte hash)
# 9. nLocktime of the transaction (4-byte little endian)
# 10. sighash type of the signature (4-byte little endian)
signatures = []
witness_stack = []
# Note that only ONE INPUT IS FILLED AT A TIME DURING SIGNING
for b2i in range(0, len(bytes_2_inputs)):
b2 = bytes_2_inputs[b2i]
segwit_payload = None if not b2[7] else hashlib.sha256(b2[7]).digest()
# Sign it
if not segwit_payload:
signatures.append(
assemble_legacy_signature(
bytes_1, bytes_2_inputs, bytes_3, bytes_4, b2[8], b2i
)
)
witness_stack.append([])
else:
# It's a segwit input, sign it and place it on the witness stack.
# Note: ZPyWallet doesn't support signing with non-standard
# uncompressed segwit pubkeys.
private_key = b2[4]
sighash = b2[5]
pubkey = private_key.public_key.to_bytes()
der = private_key.der_sign(segwit_payload) + bytes([sighash])
witness_stack.append([der, pubkey])
signatures.append(b"\x00")
# Now that we have all the signatures, we can assemble the signed transaction
signed_transaction = bytes_1
for i in range(0, len(bytes_2_inputs)):
signed_transaction += bytes_2_inputs[i][0]
signed_transaction += signatures[i]
signed_transaction += bytes_2_inputs[i][2]
signed_transaction += bytes_3
# Assemble the witness stack, one per input, segwit inputs only
for w in witness_stack:
signed_transaction += create_varint(len(w))
witness_bytes = b""
for w_elem in w:
witness_bytes += create_varint(len(w_elem)) + w_elem
signed_transaction += witness_bytes
signed_transaction += bytes_4
return signed_transaction.hex()
[docs]def create_transaction(
inputs: List[UTXO],
outputs: List[Destination],
rbf=True,
network=BitcoinSegwitMainNet,
full_nodes=None,
**kwargs
):
"""
Creates a signed Bitcoin transaction.
Usually you do not want to use this function, because it does not
automatically calculate change or fees. Instead, use
Wallet.create_transaction().
Args:
inputs (List[UTXO]): A list of Unspent Transaction Output objects.
outputs (List[Destination]): A list of Destination objects representing
the transaction outputs.
rbf (bool, optional): Whether to enable Replace-By-Fee. Defaults to True.
Only for Bitcoin-like blockchains. <Deprecated>
network (CryptoNetwork, optional): The network to use. Defaults to
BitcoinSegwitMainNet.
full_nodes (list, optional): List of Web3 nodes to connect for signing.
Only for EVM blockchains.
gas (int, optional): Specifies the gas in Gwei. Only for EVM blockchains.
Returns:
bytes: The signed transaction data in bytes.
Raises:
ValueError: If there's an issue with the transaction or network configuration.
"""
# First, construct the raw transacation
if network.SUPPORTS_EVM:
if len(inputs) != 1:
raise ValueError("EVM transactions require exactly one funded input")
if len(outputs) != 1:
raise ValueError("EVM transactions support exactly one destination")
return create_web3_transaction(
inputs[0].address(),
outputs[0].address(),
outputs[0].amount(in_standard_units=False),
inputs[0]._private_key(),
full_nodes,
kwargs.get("gas"),
network.CHAIN_ID,
nonce=kwargs.get("nonce"),
data=kwargs.get("data"),
gas_price=kwargs.get("gas_price"),
max_fee_per_gas=kwargs.get("max_fee_per_gas"),
max_priority_fee_per_gas=kwargs.get("max_priority_fee_per_gas"),
)
legacy = []
for i in inputs:
is_legacy = is_b58check(i.address())
legacy.append(is_legacy)
all_legacy = all(legacy)
if not all_legacy and not network.SUPPORTS_SEGWIT:
raise ValueError("You must use a segwit network to use bech32 inputs")
tx_bytes_1 = tx_bytes_3 = tx_bytes_4 = b""
tx_bytes_1 += int_to_hex(1, 4) # Version 1 transaction
if network.SUPPORTS_SEGWIT and not all_legacy:
tx_bytes_1 += b"\x00\x01" # Signal segwit support
# We process the outputs before the inputs so that we can use it for segwit transactions.
tx_bytes_3 += create_varint(len(outputs))
tx_bytes_3a = b""
for o in outputs:
tx_bytes_3b = b""
tx_bytes_3b += int_to_hex(o.amount(in_standard_units=False), 8)
# if network.SUPPORTS_SEGWIT and o.script_pubkey()[0] == 0:
# script = b"\x76\xa9" + o.script_pubkey()[1:] + b"\x88\xac"
# tx_bytes_3b += create_varint(len(script)) + script
# else:
script = o.script_pubkey()
tx_bytes_3b += create_varint(len(script))
tx_bytes_3b += script
tx_bytes_3a += tx_bytes_3b
tx_bytes_3 += tx_bytes_3a
# Inputs
tx_bytes_1 += create_varint(len(inputs))
tx_bytes_2_inputs = []
for num in range(len(inputs)):
i = inputs[num]
input_bytes_1 = input_bytes_2 = input_bytes_3 = b""
input_bytes_1 += binascii.unhexlify(i.txid().encode())[::-1]
input_bytes_1 += int_to_hex(i.index(), 4)
# The transacion cannot be signed until it is fully constructed.
# To avoid a chicken-and-egg, we set the signature scripts to empty.
# This is the prescribed behavior by the bitcoin protocol.
# EDIT I heard it's just the scriptpubkey
input_bytes_2 = (
b"\x00" # create_varint(len(i._script_pubkey())) + i._script_pubkey()
)
input_bytes_3 = int_to_hex(
0xFFFFFFFD, 4
) # see https://bitcointalk.org/index.php?topic=5479540.msg63401889#msg63401889
inputs[num]._output["nsequence"] = input_bytes_3
segwit_payload = b""
# It is easier to prepare the Segwit signing data here.
is_legacy = legacy.pop(0)
if network.SUPPORTS_SEGWIT and not is_legacy:
segwit_payload = assemble_segwit_payload(
i, inputs, input_bytes_3, tx_bytes_3a
)
# If this is a segwit transaction these will need to go into witness data eventually.
tx_bytes_2_inputs.append(
[
input_bytes_1,
input_bytes_2,
input_bytes_3,
i._private_key().public_key.script(),
i._private_key(),
SIGHASH_ALL,
i._addresshash(),
segwit_payload,
i.network(),
]
)
# tx_bytes_3 should also contain the witness data
tx_bytes_4 += int_to_hex(0, 4) # Disable locktime (redundant)
if network.SUPPORTS_SEGWIT and not all_legacy:
return create_signatures_segwit(
tx_bytes_1, tx_bytes_2_inputs, tx_bytes_3, tx_bytes_4
)
return create_signatures_legacy(
tx_bytes_1, tx_bytes_2_inputs, tx_bytes_3, tx_bytes_4
)
def _normalize_signed_web3_transaction(signed_transaction):
if isinstance(signed_transaction, dict):
for key in ("raw_transaction", "rawTransaction"):
if key in signed_transaction:
return _normalize_signed_web3_transaction(signed_transaction[key])
for attr in ("raw_transaction", "rawTransaction"):
if hasattr(signed_transaction, attr):
return _normalize_signed_web3_transaction(
getattr(signed_transaction, attr)
)
if isinstance(signed_transaction, str):
normalized = (
signed_transaction[2:]
if signed_transaction.startswith("0x")
else signed_transaction
)
bytes.fromhex(normalized)
return normalized
if isinstance(signed_transaction, (bytes, bytearray)):
raw_bytes = bytes(signed_transaction)
if raw_bytes.startswith(b"0x"):
try:
return bytes.fromhex(raw_bytes[2:].decode()).hex()
except ValueError:
pass
try:
decoded = raw_bytes.decode()
except UnicodeDecodeError:
return raw_bytes.hex()
normalized = decoded[2:] if decoded.startswith("0x") else decoded
try:
bytes.fromhex(normalized)
except ValueError:
return raw_bytes.hex()
return normalized
raise TypeError("Unsupported signed web3 transaction payload")
[docs]def create_web3_transaction(
a_from,
a_to,
amount,
private_key,
fullnodes,
gas,
chain_id,
nonce=None,
data=None,
gas_price=None,
max_fee_per_gas=None,
max_priority_fee_per_gas=None,
):
def add_web3_cache_middleware(middleware_onion):
for middleware_name in (
"time_based_cache_middleware",
"latest_block_based_cache_middleware",
"simple_cache_middleware",
):
middleware_factory = getattr(web3.middleware, middleware_name, None)
if middleware_factory is not None:
middleware_onion.add(middleware_factory)
def normalize_private_key(key):
if isinstance(key, bytes):
return key
if isinstance(key, str):
key = key[2:] if key.startswith("0x") else key
return bytes.fromhex(key)
return bytes(key)
def maybe_call(value):
return value() if callable(value) else value
def normalize_data(payload):
if payload is None:
return None
if isinstance(payload, bytes):
return payload
if isinstance(payload, bytearray):
return bytes(payload)
if isinstance(payload, str):
normalized = payload[2:] if payload.startswith("0x") else payload
return bytes.fromhex(normalized)
raise TypeError("data must be bytes or a hex string")
def get_latest_block(w3):
get_block = getattr(w3.eth, "get_block", None)
if get_block is None:
get_block = w3.eth.getBlock
return get_block("latest")
has_legacy_fee = gas_price is not None
has_eip1559_fee = (
max_fee_per_gas is not None or max_priority_fee_per_gas is not None
)
if has_legacy_fee and has_eip1559_fee:
raise ValueError(
"Cannot mix gas_price with EIP-1559 fee fields in one transaction"
)
if has_eip1559_fee and (
max_fee_per_gas is None or max_priority_fee_per_gas is None
):
raise ValueError(
"Both max_fee_per_gas and max_priority_fee_per_gas are required together"
)
def build_fee_fields(w3):
if has_legacy_fee:
return {"gasPrice": int(gas_price)}
if has_eip1559_fee:
return {
"maxFeePerGas": int(max_fee_per_gas),
"maxPriorityFeePerGas": int(max_priority_fee_per_gas),
}
try:
max_priority_fee = maybe_call(
getattr(w3.eth, "max_priority_fee", None)
)
if max_priority_fee is None:
max_priority_fee = maybe_call(
getattr(w3.eth, "maxPriorityFee", None)
)
latest_block = get_latest_block(w3)
base_fee = latest_block.get("baseFeePerGas")
if base_fee is not None and max_priority_fee is not None:
max_priority_fee = int(max_priority_fee)
return {
"maxFeePerGas": int(base_fee) * 2 + max_priority_fee,
"maxPriorityFeePerGas": max_priority_fee,
}
except Exception:
pass
resolved_gas_price = maybe_call(getattr(w3.eth, "gas_price", None))
if resolved_gas_price is None:
resolved_gas_price = maybe_call(getattr(w3.eth, "gasPrice", None))
if resolved_gas_price is None and hasattr(w3.eth, "generate_gas_price"):
resolved_gas_price = maybe_call(w3.eth.generate_gas_price)
if resolved_gas_price is None:
raise RuntimeError("Unable to determine gas price")
return {"gasPrice": int(resolved_gas_price)}
sender_address = a_from
receiver_address = a_to
normalized_private_key = normalize_private_key(private_key)
normalized_data = normalize_data(data)
expected_sender = web3.Account.from_key(normalized_private_key).address
if to_checksum_address(sender_address) != to_checksum_address(expected_sender):
raise ValueError("Sender address does not match private key")
# All amounts are in WEI not Ether
# Check the nonce for the sender address
if not fullnodes:
raise RuntimeError("Cannot sign web3 transaction without any full nodes")
last_error = None
for node in fullnodes:
try:
w3 = web3.Web3(web3.HTTPProvider(node["url"]))
# This makes it fetch max<priority>feepergas info faster
w3.eth.set_gas_price_strategy(fast_gas_price_strategy)
add_web3_cache_middleware(w3.middleware_onion)
resolved_nonce = nonce
if resolved_nonce is None:
nonce_method = getattr(w3.eth, "get_transaction_count", None)
if nonce_method is None:
nonce_method = w3.eth.getTransactionCount
resolved_nonce = nonce_method(to_checksum_address(sender_address))
# Build the transaction dictionary
transaction = {
"nonce": int(resolved_nonce),
"value": int(amount),
"chainId": chain_id,
}
if receiver_address:
transaction["to"] = to_checksum_address(receiver_address)
if normalized_data is not None:
transaction["data"] = normalized_data
# OK now calculate the gas
if not gas:
estimate_transaction = dict(
transaction, **{"from": to_checksum_address(sender_address)}
)
gas = w3.eth.estimate_gas(estimate_transaction)
transaction["gas"] = int(gas)
transaction.update(build_fee_fields(w3))
# Sign the transaction
sign_method = getattr(w3.eth.account, "sign_transaction", None)
if sign_method is None:
sign_method = w3.eth.account.signTransaction
return _normalize_signed_web3_transaction(
sign_method(transaction, normalized_private_key)
)
except Exception as e:
last_error = e
if last_error is None:
raise RuntimeError("Cannot sign web3 transaction (try specifying different nodes)")
raise RuntimeError(
"Cannot sign web3 transaction (try specifying different nodes)"
) from last_error