# -*- coding: utf-8 -*-
"""
The keccak module provides an implementation of the Keccak hash function, including various parameter presets
such as Keccak224, Keccak256, Keccak384, and Keccak512. The module offers a hashlib-compatible interface for
easy integration into existing codebases.
"""
from math import log
from operator import xor
from copy import deepcopy
from functools import reduce
from binascii import hexlify
# The round constants used in the Keccak-f permutation.
RoundConstants = [
0x0000000000000001,
0x0000000000008082,
0x800000000000808A,
0x8000000080008000,
0x000000000000808B,
0x0000000080000001,
0x8000000080008081,
0x8000000000008009,
0x000000000000008A,
0x0000000000000088,
0x0000000080008009,
0x000000008000000A,
0x000000008000808B,
0x800000000000008B,
0x8000000000008089,
0x8000000000008003,
0x8000000000008002,
0x8000000000000080,
0x000000000000800A,
0x800000008000000A,
0x8000000080008081,
0x8000000000008080,
0x0000000080000001,
0x8000000080008008,
]
# The rotation constants used in the Keccak-f permutation.
RotationConstants = [
[
0,
1,
62,
28,
27,
],
[
36,
44,
6,
55,
20,
],
[
3,
10,
43,
25,
39,
],
[
41,
45,
15,
21,
8,
],
[
18,
2,
61,
56,
14,
],
]
# Bit masks used for circular rotations in the Keccak-f permutation.
Masks = [(1 << i) - 1 for i in range(65)]
[docs]def bits2bytes(x):
# Converts the given number of bits to the corresponding number of bytes, rounding up if necessary.
return (int(x) + 7) // 8
[docs]def rol(value, left, bits):
# Circularly rotate 'value' to the left,
# treating it as a quantity of the given size in bits.
top = value >> (bits - left)
bot = (value & Masks[bits - left]) << left
return bot | top
[docs]def ror(value, right, bits):
# Circularly rotate 'value' to the right,
# treating it as a quantity of the given size in bits.
top = value >> right
bot = (value & Masks[right]) << (bits - right)
return bot | top
[docs]def multirate_padding(used_bytes, align_bytes):
# Generates padding bytes according to the Keccak padding scheme,
# ensuring alignment to the specified number of bytes.
padlen = align_bytes - used_bytes
if padlen == 0:
padlen = align_bytes
# note: padding done in 'internal bit ordering', wherein LSB is leftmost
if padlen == 1:
return [0x81]
else:
return [0x01] + ([0x00] * (padlen - 2)) + [0x80]
[docs]def f_round(state, a, rc):
W, H = state.W, state.H
range_w, range_h = state.range_w, state.range_h
lanew = state.lanew
zero = state.zero
# theta
C = [reduce(xor, a[x]) for x in range_w]
D = [0] * W
for x in range_w:
D[x] = C[(x - 1) % W] ^ rol(C[(x + 1) % W], 1, lanew)
for y in range_h:
a[x][y] ^= D[x]
# rho and pi
B = zero()
for x in range_w:
for y in range_h:
B[y % W][(2 * x + 3 * y) % H] = rol(a[x][y], RotationConstants[y][x], lanew)
# chi
for x in range_w:
for y in range_h:
a[x][y] = B[x][y] ^ ((~B[(x + 1) % W][y]) & B[(x + 2) % W][y])
# iota
a[0][0] ^= rc
[docs]def keccak_f(state):
# Performs the Keccak-f permutation on the given Keccak state. It applies multiple
# rounds of theta, rho, pi, chi, and iota operations to mutate the state. It operates
# on and mutates the passed-in KeccakState.
l = int(log(state.lanew, 2))
nr = 12 + 2 * l
for ir in range(nr):
f_round(state, state.s, RoundConstants[ir])
[docs]class KeccakState(object):
"""
A keccak state container.
Represents the internal state of the Keccak algorithm.
It maintains the state as a 5x5 table of integers and provides methods for
manipulating the state, converting between byte sequences and lanes, and
formatting the state as a hexadecimal string.
"""
W = 5
H = 5
range_w = list(range(W))
range_h = list(range(H))
[docs] @staticmethod
def zero():
"""
Returns an zero state table.
"""
return [[0] * KeccakState.W for _ in KeccakState.range_h]
[docs] @staticmethod
def lane2bytes(s, w):
"""
Converts the lane s to a sequence of byte values,
assuming a lane is w bits.
"""
o = []
for b in range(0, w, 8):
o.append((s >> b) & 0xFF)
return o
[docs] @staticmethod
def bytes2lane(bb):
"""
Converts a sequence of byte values to a lane.
"""
r = 0
for b in reversed(bb):
r = r << 8 | b
return r
def __init__(self, bitrate, b):
self.bitrate = bitrate
self.b = b
# only byte-aligned
assert self.bitrate % 8 == 0
self.bitrate_bytes = bits2bytes(self.bitrate)
assert self.b % 25 == 0
self.lanew = self.b // 25
self.s = KeccakState.zero()
def __str__(self):
return KeccakState.format(self.s)
[docs] def absorb(self, bb):
"""
Mixes in the given bitrate-length string to the state.
"""
assert len(bb) == self.bitrate_bytes
bb += [0] * bits2bytes(self.b - self.bitrate)
i = 0
for y in self.range_h:
for x in self.range_w:
self.s[x][y] ^= KeccakState.bytes2lane(bb[i : i + 8])
i += 8
[docs] def squeeze(self):
"""
Returns the bitrate-length prefix of the state to be output.
"""
return self.get_bytes()[: self.bitrate_bytes]
[docs] def get_bytes(self):
"""
Convert whole state to a byte string.
"""
out = [0] * bits2bytes(self.b)
i = 0
for y in self.range_h:
for x in self.range_w:
v = KeccakState.lane2bytes(self.s[x][y], self.lanew)
out[i : i + 8] = v
i += 8
return out
[docs] def set_bytes(self, bb):
"""
Set whole state from byte string, which is assumed
to be the correct length.
"""
i = 0
for y in self.range_h:
for x in self.range_w:
self.s[x][y] = KeccakState.bytes2lane(bb[i : i + 8])
i += 8
[docs]class KeccakSponge(object):
"""
Implements the sponge construction of the Keccak algorithm. It absorbs
input data, applies the Keccak-f permutation, and produces output data
based on the specified bitrate and capacity.
"""
def __init__(self, bitrate, width, padfn, permfn):
self.state = KeccakState(bitrate, width)
self.padfn = padfn
self.permfn = permfn
self.buffer = []
[docs] def copy(self):
"""Creates a deep copy of the KeccakHash object, including the internal state."""
return deepcopy(self)
[docs] def absorb_block(self, bb):
"""Absorbs a block of data of the bitrate length into the sponge's internal state."""
assert len(bb) == self.state.bitrate_bytes
self.state.absorb(bb)
self.permfn(self.state)
[docs] def absorb(self, s):
"""Absorbs input strings or bytes as byte data."""
if type(s) is str:
self.buffer += bytes(s, "latin1")
elif type(s) is bytes:
self.buffer += s
else:
raise TypeError("Expected bytes or str")
while len(self.buffer) >= self.state.bitrate_bytes:
self.absorb_block(self.buffer[: self.state.bitrate_bytes])
self.buffer = self.buffer[self.state.bitrate_bytes :]
[docs] def absorb_final(self):
"""
Used to apply padding to the remaining input data and absorb it into
the sponge's internal state.
"""
padded = self.buffer + self.padfn(len(self.buffer), self.state.bitrate_bytes)
self.absorb_block(padded)
self.buffer = []
[docs] def squeeze_once(self):
"""Generates a single block of squeezed output data from the sponge's internal state."""
rc = self.state.squeeze()
self.permfn(self.state)
return rc
[docs] def squeeze(self, l):
"""Generates squeezed output data of the specified length from the sponge's internal state.
Args:
l (int): Length of squeezed data to return."""
Z = self.squeeze_once()
while len(Z) < l:
Z += self.squeeze_once()
return Z[:l]
[docs]class KeccakHash(object):
"""
The Keccak hash function, with a hashlib-compatible interface.
Represents a Keccak hash object with customizable bitrate, capacity, and output length.
It provides methods for updating the hash state with input data and generating the
final hash value.
"""
def __init__(self, bitrate_bits, capacity_bits, output_bits):
# our in-absorption sponge. this is never given padding
assert bitrate_bits + capacity_bits in (25, 50, 100, 200, 400, 800, 1600)
self.sponge = KeccakSponge(
bitrate_bits, bitrate_bits + capacity_bits, multirate_padding, keccak_f
)
# hashlib interface members
assert output_bits % 8 == 0
self.digest_size = bits2bytes(output_bits)
self.block_size = bits2bytes(bitrate_bits)
def __repr__(self):
inf = (
self.sponge.state.bitrate,
self.sponge.state.b - self.sponge.state.bitrate,
self.digest_size * 8,
)
return "<KeccakHash with r=%d, c=%d, image=%d>" % inf
[docs] def copy(self):
"""Creates a deep copy of the KeccakHash object, including the internal state."""
return deepcopy(self)
[docs] def update(self, s):
"""
Updates the hash state by absorbing the input data `s`. The input data can be either a string or bytes object.
Args:
s (str or bytes): The input data to update the hash state with.
"""
self.sponge.absorb(s)
[docs] def digest(self):
"""
Retrieves the final hash value as a bytes object. The hash state is finalized before generating the digest.
Returns:
bytes: The final hash value as a bytes object.
"""
finalised = self.sponge.copy()
finalised.absorb_final()
digest = bytes(finalised.squeeze(self.digest_size))
return digest
[docs] def hexdigest(self):
"""
Retrieves the final hash value as a hex string. The hash state is finalized before generating the digest.
Returns:
str: The final hash value as a hex string.
"""
return hexlify(self.digest()).decode()
[docs] @staticmethod
def preset(bitrate_bits, capacity_bits, output_bits):
"""
Returns a factory function for the given bitrate, sponge capacity and output length.
The function accepts an optional initial input, ala hashlib.
"""
def create(initial_input=None):
h = KeccakHash(bitrate_bits, capacity_bits, output_bits)
if initial_input is not None:
h.update(initial_input)
return h
return create
"""
SHA3 parameter preset with a bitrate of 1152 bits, a capacity of 448 bits,
and an output length of 224 bits.
"""
Keccak224 = KeccakHash.preset(1152, 448, 224)
"""
SHA3 parameter preset with a bitrate of 1088 bits, a capacity of 512 bits,
and an output length of 256 bits.
"""
Keccak256 = KeccakHash.preset(1088, 512, 256)
"""
SHA3 parameter preset with a bitrate of 832 bits, a capacity of 768 bits,
and an output length of 384 bits.
"""
Keccak384 = KeccakHash.preset(832, 768, 384)
"""
SHA3 parameter preset with a bitrate of 576 bits, a capacity of 1024 bits,
and an output length of 512 bits."""
Keccak512 = KeccakHash.preset(576, 1024, 512)
[docs]def to_checksum_address(address):
"""Converts a hexadecimal Ethereum address to its checksum format.
This function takes a hexadecimal Ethereum address as input and returns the checksum format of the address.
Args:
address (str): The hexadecimal Ethereum address to be converted.
Returns:
str: The checksum format of the Ethereum address.
Example:
>>> to_checksum_address("0x123abc")
'0x123aBc'
"""
address = address.lower().replace("0x", "")
keccak_hash = Keccak256(address.encode("utf-8")).hexdigest()
checksum_address = "0x"
for i in range(len(address)):
if int(keccak_hash[i], 16) >= 8:
checksum_address += address[i].upper()
else:
checksum_address += address[i]
return checksum_address
[docs]def is_checksum_address(address):
"""
Checks if a hexadecimal Ethereum address is in checksum format.
This function verifies whether a given hexadecimal Ethereum address is in checksum format.
Args:
address (str): The hexadecimal Ethereum address to be checked.
Returns:
bool: True if the address is in checksum format, False otherwise.
Example:
>>> is_checksum_address("0x123aBc")
True
"""
address = address.replace("0x", "")
address_hash = Keccak256(address.lower().encode("utf-8")).hexdigest()
for i in range(0, 40):
# The nth letter should be uppercase if the nth digit of casemap is 1
if (int(address_hash[i], 16) > 7 and address[i].upper() != address[i]) or (
int(address_hash[i], 16) <= 7 and address[i].lower() != address[i]
):
return False
return True
[docs]def eth_transaction_hash(address: str, nonce: int) -> str:
"""Constructs an Ethereum transaction hash from an address and a transaction number"""
# Combine the address and nonce as a string
data_to_hash = address.lower() + format(nonce, "x")
# Calculate the hash using keccak256
transaction_hash = Keccak256(bytes.fromhex(data_to_hash)).digest().hex()
return transaction_hash