Source code for zpywallet.address.loadbalancer

from .blockcypher import BlockcypherClient
from .blockstream import BlockstreamClient
from .esplora import EsploraClient
from .fullnode import RPCClient
from .mempoolspace import MempoolSpaceClient
from .web3node import Web3Client
from ..generated import wallet_pb2
from ..errors import NetworkException
from .provider import AddressProvider
from contextlib import suppress
from pathlib import Path
from tempfile import TemporaryDirectory

# TODO we currently have no easy way to update cache providers asynchronously.


[docs]class CryptoClient(AddressProvider): """Represents a list of crypto addresses. Developers should use this class, because it autoselects the most stable providers to fetch data from. """ def __init__( self, addresses, coin="BTC", chain="main", use_database=False, transactions=None, **kwargs, ): super().__init__(addresses, transactions=transactions) self.coin = coin.upper() self.cache_provider_list = [] self.provider_list = [] self.current_index = 0 self._database_initialized = False fullnode_endpoints = kwargs.get("fullnode_endpoints") or [] esplora_endpoints = kwargs.get("esplora_endpoints") or [] blockcypher_tokens = kwargs.get("blockcypher_tokens") or [] history_start_block = kwargs.get("history_start_block") history_lookback_blocks = kwargs.get("history_lookback_blocks") allow_unbounded_history_sync = kwargs.get( "allow_unbounded_history_sync", False ) include_pending_history = kwargs.get("include_pending_history", False) self._db_tempdir = None self.db_connection_parameters = None if use_database: self.db_connection_parameters = kwargs.get("db_connection_parameters") if not self.db_connection_parameters: # The docs promise sqlite as the default DBAPI backend. self._db_tempdir = TemporaryDirectory(prefix="zpywallet-") db_path = Path(self._db_tempdir.name) / "txcache.sqlite" self.db_connection_parameters = f"sqlite:///{db_path}" if use_database: for endpoint in fullnode_endpoints: with suppress(ValueError, NetworkException): self.cache_provider_list.append( RPCClient( addresses, coin, chain, transactions=self.transactions, db_connection_parameters=self.db_connection_parameters, history_start_block=history_start_block, history_lookback_blocks=history_lookback_blocks, allow_unbounded_history_sync=allow_unbounded_history_sync, include_pending_history=include_pending_history, **endpoint, ) ) with suppress(ValueError, NetworkException): self.cache_provider_list.append( Web3Client( addresses, coin, chain, transactions=self.transactions, db_connection_parameters=self.db_connection_parameters, history_start_block=history_start_block, history_lookback_blocks=history_lookback_blocks, allow_unbounded_history_sync=allow_unbounded_history_sync, include_pending_history=include_pending_history, **endpoint, ) ) for endpoint in esplora_endpoints: with suppress(ValueError, NetworkException): self.cache_provider_list.append( EsploraClient( addresses, coin, chain, transactions=self.transactions, db_connection_parameters=self.db_connection_parameters, history_start_block=history_start_block, **endpoint, ) ) with suppress(ValueError, NetworkException): self.cache_provider_list.append( BlockstreamClient( addresses, coin, chain, transactions=self.transactions, db_connection_parameters=self.db_connection_parameters, history_start_block=history_start_block, ) ) with suppress(ValueError, NetworkException): self.cache_provider_list.append( MempoolSpaceClient( addresses, coin, chain, transactions=self.transactions, db_connection_parameters=self.db_connection_parameters, history_start_block=history_start_block, ) ) for token in blockcypher_tokens: with suppress(ValueError, NetworkException): self.provider_list.append( BlockcypherClient( addresses, coin, chain, transactions=self.transactions, token=token, ) ) # Blockcypher without any token with suppress(ValueError, NetworkException): self.provider_list.append( BlockcypherClient( addresses, coin, chain, transactions=self.transactions, ) ) if not self.provider_list and not self.cache_provider_list: raise ValueError(f"No providers for coin '{coin}', chain '{chain}' found.") def _advance_to_next_provider(self): if not self.provider_list: return newindex = (self.current_index + 1) % len(self.provider_list) self.provider_list[newindex].transactions = self.provider_list[ self.current_index ].transactions self.current_index = newindex
[docs] def initialize_database(self): last_error = None for provider in self.cache_provider_list: # They all use the same database so we can just read the mempool # from the first one to populate the database. try: provider.read_mempool() self._database_initialized = True return except NetworkException as e: last_error = e continue if last_error is not None: raise last_error raise NetworkException("Failed to populate database - no cache providers available")
[docs] def get_balance(self): last_error = None for provider in self.cache_provider_list: try: return provider.get_balance() except NetworkException as e: last_error = e continue for provider in self.provider_list: try: return provider.get_balance() except NetworkException as e: last_error = e continue if self.coin == "ETH": if last_error is not None: raise last_error raise NetworkException("All address providers failed to get balance") return super().get_balance()
[docs] def get_block_height(self): """ Retrieves the current block height. Returns: int: The current block height. Raises: NetworkException: If the API request fails or the block height cannot be retrieved. """ for provider in self.cache_provider_list: try: h = provider.get_block_height() if h > 0: return h except NetworkException: continue for provider in self.provider_list: try: h = provider.get_block_height() if h > 0: return h except NetworkException: continue raise NetworkException("All address providers failed to get block height")
[docs] def get_transaction_history(self): """ Retrieves the transaction history of the Litecoin address from cached data augmented with network data. Returns: list: A list of transaction objects. Raises: NetworkException: If the API request fails or the transaction history cannot be retrieved. """ min_height = ( 0 if not self.transactions else max([tx.height for tx in self.transactions] + [-1]) ) last_error = None provider_succeeded = False for provider in self.cache_provider_list + self.provider_list: provider.transactions = self.transactions provider.height = min_height try: provider.get_transaction_history() self.transactions = provider.transactions provider_succeeded = True break except NetworkException as e: last_error = e continue if not provider_succeeded and not self.transactions and last_error is not None: raise last_error return self.transactions