例子
- 查找块
- 获取最新块
- 查看账户余额
- 兑换货币面值
- 进行交易
- 查找交易
- 查找收据
- 使用合约
- 通过 ethPM 处理合约
- 使用 ERC20 令牌合约
- Python 中的合约单元测试
- 使用 Infura Rinkeby 节点
- 调整日志级别
- 高级示例:获取所有令牌传输事件
以下是你可能想用 web3 做的一些常见的事情。
查找块
使用web3.eth.get_block
API,可以通过块的编号或散列来查找块。块散列应该以十六进制表示。块号
# get a block by number
>>> web3.eth.get_block(12345)
{
'author': '0xad5C1768e5974C231b2148169da064e61910f31a',
'difficulty': 735512610763,
'extraData': '0x476574682f76312e302e302f6c696e75782f676f312e342e32',
'gasLimit': 5000,
'gasUsed': 0,
'hash': '0x767c2bfb3bdee3f78676c1285cd757bcd5d8c272cef2eb30d9733800a78c0b6d',
'logsBloom': '0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000',
'miner': '0xad5c1768e5974c231b2148169da064e61910f31a',
'mixHash': '0x31d9ec7e3855aeba37fd92aa1639845e70b360a60f77f12eff530429ef8cfcba',
'nonce': '0x549f882c5f356f85',
'number': 12345,
'parentHash': '0x4b3c1d7e65a507b62734feca1ee9f27a5379e318bd52ae62de7ba67dbeac66a3',
'receiptsRoot': '0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421',
'sealFields': ['0x31d9ec7e3855aeba37fd92aa1639845e70b360a60f77f12eff530429ef8cfcba',
'0x549f882c5f356f85'],
'sha3Uncles': '0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347',
'size': 539,
'stateRoot': '0xca495e22ed6b88c61714d129dbc8c94f5bf966ac581c09a57c0a72d0e55e7286',
'timestamp': 1438367030,
'totalDifficulty': 3862140487204603,
'transactions': [],
'transactionsRoot': '0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421',
'uncles': [],
}
# get a block by it's hash
>>> web3.eth.get_block('0x767c2bfb3bdee3f78676c1285cd757bcd5d8c272cef2eb30d9733800a78c0b6d')
{...}
获取最新块
您还可以使用web3.eth.get_block
API 中的字符串'latest'
来检索最新的块。
>>> web3.eth.get_block('latest')
{...}
如果你想知道最新的块号,你可以使用web3.eth.block_number
属性。
>>> web3.eth.block_number
4194803
查看账户余额
要找到一个帐户拥有的乙醚量,请使用 get_balance()
方法。在撰写本文时,最以太的帐户的公共地址为 0x 742d 35 cc 6634 c 0532925 a3 b 844 BC 454 e 4438 f 44 e。
>>> web3.eth.get_balance('0x742d35Cc6634C0532925a3b844Bc454e4438f44e')
3841357360894980500000001
注意,这个数不是用以太来命名的,而是用以太坊中最小的价值单位,卫来命名的。请继续阅读,了解如何将该数字转换为以太。
兑换货币面值
Web3 可以帮你在面额之间转换。支持以下面额。
| 名称 | 魏量 | | 魏 | one | | 奎 | One thousand | | 巴贝奇 | One thousand | | 女性伴侣 | One thousand | | mwei | One million | | 色鬼 | One million | | 微微醚 | One million | | 圭 | One billion | | 香农 | One billion | | 纳米 | One billion | | 毫微;纤(10 的负九次方) | One billion | | 绍博 | 1000000000000 | | 千分尺 | 1000000000000 | | 微观的,具体领域的 | 1000000000000 | | 芬尼 | 1000000000000000 | | 毫米 | 1000000000000000 | | 毫 | 1000000000000000 | | 醚 | 1000000000000000000 | | 凯瑟 | 1000000000000000000000 | | 宏伟的 | 1000000000000000000000 | | 梅特尔 | 1000000000000000000000000 | | 在一起 | 1000000000000000000000000000 | | 极限 | 1000000000000000000000000000000 |
从前面的例子来看,最大的帐户包含 3841357360894980500000001 魏。您可以使用fromWei()
方法将该余额转换为乙醚(或其他名称)。
>>> web3.fromWei(3841357360894980500000001, 'ether')
Decimal('3841357.360894980500000001')
要换算回魏,可以用反函数,toWei()
。注意 Python 的默认浮点精度对于这个用例来说是不够的,所以如果还没有的话,有必要将值转换成十进制的。
>>> from decimal import Decimal
>>> web3.toWei(Decimal('3841357.360894980500000001'), 'ether')
3841357360894980500000001
最佳实践:如果您需要处理多种货币面额,则默认为 wei。一个典型的工作流可能需要从某个单位转换到卫,然后从卫转换到您需要的任何单位。
>>> web3.toWei(Decimal('0.000000005'), 'ether')
5000000000
>>> web3.fromWei(5000000000, 'gwei')
Decimal('5')
进行交易
进行交易有几个选项:
-
Use this method if:
- 你想把乙醚从一个账户转到另一个账户。
-
Use this method if:
- 您希望在其他地方签署交易,例如硬件钱包。
- 您希望通过另一个供应商(例如 Infura)广播一个事务。
- 您还有其他一些需要更多灵活性的高级用例。
-
Use these methods if:
- 你想与合约互动。Web3.py 解析合约 ABI,并通过
functions
属性使这些函数可用。
- 你想与合约互动。Web3.py 解析合约 ABI,并通过
-
T2
construct_sign_and_send_raw_middleware()
Use this middleware if:
- 您希望在使用
w3.eth.send_transaction
或ContractFunctions
时自动签名。
- 您希望在使用
注意
您的键的位置(例如,本地或托管)将对这些方法产生影响。在此 阅读差异 。
查找交易
您可以使用web3.eth.get_transaction
功能查找交易。
>>> web3.eth.get_transaction('0x5c504ed432cb51138bcf09aa5e8a410dd4a1e204ef84bfed1be16dfba1b22060')
{
'blockHash': '0x4e3a3754410177e6937ef1f84bba68ea139e8d1a2258c5f85db9f1cd715a1bdd',
'blockNumber': 46147,
'condition': None,
'creates': None,
'from': '0xA1E4380A3B1f749673E270229993eE55F35663b4',
'gas': 21000,
'gasPrice': None,
'hash': '0x5c504ed432cb51138bcf09aa5e8a410dd4a1e204ef84bfed1be16dfba1b22060',
'input': '0x',
'maxFeePerGas': 2000000000,
'maxPriorityFeePerGas': 1000000000,
'networkId': None,
'nonce': 0,
'publicKey': '0x376fc429acc35e610f75b14bc96242b13623833569a5bb3d72c17be7e51da0bb58e48e2462a59897cead8ab88e78709f9d24fd6ec24d1456f43aae407a8970e4',
'r': '0x88ff6cf0fefd94db46111149ae4bfc179e9b94721fffd821d38d16464b3f71d0',
'raw': '0xf86780862d79883d2000825208945df9b87991262f6ba471f09758cde1c0fc1de734827a69801ca088ff6cf0fefd94db46111149ae4bfc179e9b94721fffd821d38d16464b3f71d0a045e0aff800961cfce805daef7016b9b675c137a6a41a548f7b60a3484c06a33a',
's': '0x45e0aff800961cfce805daef7016b9b675c137a6a41a548f7b60a3484c06a33a',
'standardV': '0x1',
'to': '0x5DF9B87991262F6BA471F09758CDE1c0FC1De734',
'transactionIndex': 0,
'v': '0x1c',
'value': 31337,
}
如果找不到给定散列的事务,那么这个函数将返回None
。
查找收据
可以使用web3.eth.get_transaction_receipt
API 检索交易收据。
>>> web3.eth.get_transaction_receipt('0x5c504ed432cb51138bcf09aa5e8a410dd4a1e204ef84bfed1be16dfba1b22060')
{
'blockHash': '0x4e3a3754410177e6937ef1f84bba68ea139e8d1a2258c5f85db9f1cd715a1bdd',
'blockNumber': 46147,
'contractAddress': None,
'cumulativeGasUsed': 21000,
'gasUsed': 21000,
'logs': [],
'logsBloom': '0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000',
'root': '0x96a8e009d2b88b1483e6941e6812e32263b05683fac202abc622a3e31aed1957',
'transactionHash': '0x5c504ed432cb51138bcf09aa5e8a410dd4a1e204ef84bfed1be16dfba1b22060',
'transactionIndex': 0,
}
如果事务还没有被挖掘,那么这个方法将产生一个TransactionNotFound
错误。
使用合约
与现有合约互动
为了使用现有的合约,您需要它的部署地址和 ABI。两者都可以使用块浏览器找到,如 Etherscan。一旦实例化了一个合约实例,就可以读取数据和执行事务。
# Configure w3, e.g., w3 = Web3(...)
address = '0x1f9840a85d5aF5bf1D1762F925BDADdC4201F988'
abi = '[{"inputs":[{"internalType":"address","name":"account","type":"address"},{"internalType":"address","name":"minter_","type":"address"},...'
contract_instance = w3.eth.contract(address=address, abi=abi)
# read state:
contract_instance.functions.storedValue().call()
# 42
# update state:
tx_hash = contract_instance.functions.updateValue(43).transact()
部署新合约
给定以下存储在contract.sol
的实体源文件。
contract StoreVar { uint8 public _myVar; event MyEvent(uint indexed _var); function setVar(uint8 _var) public { _myVar = _var; emit MyEvent(_var); } function getVar() public view returns (uint8) { return _myVar; } }
以下示例演示了几件事情:
- 从 sol 文件编译合约。
- 估算交易的燃气成本。
- 使用合约功能进行交易。
- 等待挖掘交易收据。
import sys
import time
import pprint
from web3.providers.eth_tester import EthereumTesterProvider
from web3 import Web3
from eth_tester import PyEVMBackend
from solcx import compile_source
def compile_source_file(file_path):
with open(file_path, 'r') as f:
source = f.read()
return compile_source(source)
def deploy_contract(w3, contract_interface):
tx_hash = w3.eth.contract(
abi=contract_interface['abi'],
bytecode=contract_interface['bin']).constructor().transact()
address = w3.eth.get_transaction_receipt(tx_hash)['contractAddress']
return address
w3 = Web3(EthereumTesterProvider(PyEVMBackend()))
contract_source_path = 'contract.sol'
compiled_sol = compile_source_file('contract.sol')
contract_id, contract_interface = compiled_sol.popitem()
address = deploy_contract(w3, contract_interface)
print(f'Deployed {contract_id} to: {address}\n')
store_var_contract = w3.eth.contract(address=address, abi=contract_interface["abi"])
gas_estimate = store_var_contract.functions.setVar(255).estimate_gas()
print(f'Gas estimate to transact with setVar: {gas_estimate}')
if gas_estimate < 100000:
print("Sending transaction to setVar(255)\n")
tx_hash = store_var_contract.functions.setVar(255).transact()
receipt = w3.eth.wait_for_transaction_receipt(tx_hash)
print("Transaction receipt mined:")
pprint.pprint(dict(receipt))
print("\nWas transaction successful?")
pprint.pprint(receipt["status"])
else:
print("Gas cost exceeds 100000")
输出:
Deployed <stdin>:StoreVar to: 0xF2E246BB76DF876Cef8b38ae84130F4F55De395b
Gas estimate to transact with setVar: 45535
Sending transaction to setVar(255)
Transaction receipt mined:
{'blockHash': HexBytes('0x837609ad0a404718c131ac5157373662944b778250a507783349d4e78bd8ac84'),
'blockNumber': 2,
'contractAddress': None,
'cumulativeGasUsed': 43488,
'gasUsed': 43488,
'logs': [AttributeDict({'type': 'mined', 'logIndex': 0, 'transactionIndex': 0, 'transactionHash': HexBytes('0x50aa3ba0673243f1e60f546a12ab364fc2f6603b1654052ebec2b83d4524c6d0'), 'blockHash': HexBytes('0x837609ad0a404718c131ac5157373662944b778250a507783349d4e78bd8ac84'), 'blockNumber': 2, 'address': '0xF2E246BB76DF876Cef8b38ae84130F4F55De395b', 'data': '0x', 'topics': [HexBytes('0x6c2b4666ba8da5a95717621d879a77de725f3d816709b9cbe9f059b8f875e284'), HexBytes('0x00000000000000000000000000000000000000000000000000000000000000ff')]})],
'status': 1,
'transactionHash': HexBytes('0x50aa3ba0673243f1e60f546a12ab364fc2f6603b1654052ebec2b83d4524c6d0'),
'transactionIndex': 0}
Was transaction successful?
1
ethPM 包包含已配置好的合约,随时可以使用。Web3 的ethpm
模块(web3.pm
)扩展了 Web3 的本机Contract
模块,对如何实例化Contract
工厂和实例做了一些修改。
您所需要的只是您想要使用的包的包名、版本和 ethPM 注册地址。ethPM 注册表是与 ethPM 包相关联的发布数据的链上数据存储。您可以在 ethPM 注册表中找到一些示例注册表。请记住,您应该只使用注册中心的包,您相信这些注册中心的维护者不会注入恶意代码!
在这个例子中,我们将使用来自ens.snakecharmers.eth
注册表的ethregistrar@3.0.0
包。
web3.pm
使用Package
类来表示一个 ethPM 包。这个对象在一个包中包含了所有的合约资产,并通过一个 API 公开它们。因此,在我们能够与我们的包交互之前,我们需要将它生成为一个Package
实例。
from web3.auto.infura import w3
# Note. To use the web3.pm module, you will need to instantiate your w3 instance
# with a web3 provider connected to the chain on which your registry lives.
# The ethPM module is still experimental and subject to change,
# so for now we need to enable it via a temporary flag.
w3.enable_unstable_package_management_api()
# Then we need to set the registry address that we want to use.
# This should be an ENS address, but can also be a checksummed contract address.
w3.pm.set_registry("ens.snakecharmers.eth")
# This generates a Package instance of the target ethPM package.
ens_package = w3.pm.get_package("ethregistrar", "3.0.0")
现在我们有了目标 ethPM 包的一个Package
表示,我们可以从这个Package
生成合约工厂和实例。但是,需要注意的是,有些包可能缺少生成实例或工厂所需的必要合约资产。您可以使用 ethPM CLI 来确定 ethPM 包中可用的合约类型和部署。
# To interact with a deployment located in an ethPM package.
# Note. This will only expose deployments located on the
# chain of the connected provider (in this example, mainnet)
mainnet_registrar = ens_package.deployments.get_instance("BaseRegistrarImplementation")
# Now you can treat mainnet_registrar like any other Web3 Contract instance!
mainnet_registrar.caller.balanceOf("0x123...")
> 0
mainnet_registrar.functions.approve("0x123", 100000).transact()
> 0x123abc... # tx_hash
# To create a contract factory from a contract type located in an ethPM package.
registrar_factory = ens_package.get_contract_factory("BaseRegistrarImplementation")
# Now you can treat registrar_factory like any other Web3 Contract factory to deploy new instances!
# Note. This will deploy new instances to the chain of the connected provider (in this example, mainnet)
registrar_factory.constructor(...).transact()
> 0x456def... # tx_hash
# To connect your Package to a new chain - simply pass it a new Web3 instance
# connected to your provider of choice. Now your factories will automatically
# deploy to this new chain, and the deployments available on a package will
# be automatically filtered to those located on the new chain.
from web3.auto.infura.goerli import w3 as goerli_w3
goerli_registrar = ens_package.update_w3(goerli_w3)
使用 ERC20 令牌合约
以太坊区块链上的大多数可替换代币都符合 ERC20 标准。指南的这一部分介绍了如何与符合此标准的现有令牌合约进行交互。
在本指南中,我们将与已经部署到本地测试链的现有令牌合约进行交互。本指南假设:
- 已知地址的现有令牌协定。
- 对给定合约的适当
ABI
的访问。 - 一个
web3.main.Web3
实例连接到一个供应商,该供应商拥有一个可以发送交易的未锁定帐户。
创建合约工厂
首先,我们需要用令牌合约的地址和ERC20
ABI 创建一个合约实例。
>>> contract = w3.eth.contract(contract_address, abi=ABI)
>>> contract.address
'0xF2E246BB76DF876Cef8b38ae84130F4F55De395b'
查询令牌元数据
每个代币将有一个代表流通代币总数的总供应量。在本例中,我们将令牌合约初始化为拥有 100 万个令牌。由于该令牌合约设置为具有 18 位小数,因此该合约返回的原始总供应量将具有 18 位额外的小数。
>>> contract.functions.name().call()
'TestToken'
>>> contract.functions.symbol().call()
'TEST'
>>> decimals = contract.functions.decimals().call()
>>> decimals
18
>>> DECIMALS = 10 ** decimals
>>> contract.functions.totalSupply().call() // DECIMALS
1000000
查询账户余额
接下来,我们可以使用合约的balanceOf
函数查询一些账户余额。我们正在使用的令牌合约从一个帐户开始,我们称之为alice
持有所有令牌。
>>> alice = '0x7E5F4552091A69125d5DfCb7b8C2659029395Bdf'
>>> bob = '0x2B5AD5c4795c026514f8317c7a215E218DcCD6cF'
>>> raw_balance = contract.functions.balanceOf(alice).call()
>>> raw_balance
1000000000000000000000000
>>> raw_balance // DECIMALS
1000000
>>> contract.functions.balanceOf(bob).call()
0
发送令牌
接下来,我们可以使用合约的transfer
函数将一些令牌从alice
转移到bob
。
>>> tx_hash = contract.functions.transfer(bob, 100).transact({'from': alice})
>>> tx_receipt = w3.eth.wait_for_transaction_receipt(tx_hash)
>>> contract.functions.balanceOf(alice).call()
999999999999999999999900
>>> contract.functions.balanceOf(bob).call()
100
创建对外转账审批
爱丽丝也可以批准其他人使用approve
功能从她的账户消费代币。我们还可以使用allowance
函数查询我们被批准花费多少代币。
>>> contract.functions.allowance(alice, bob).call()
0
>>> tx_hash = contract.functions.approve(bob, 200).transact({'from': alice})
>>> tx_receipt = w3.eth.wait_for_transaction_receipt(tx_hash)
>>> contract.functions.allowance(alice, bob).call()
200
执行外部传输
当有人有零花钱时,他们可以使用transferFrom
功能转移这些代币。
>>> contract.functions.allowance(alice, bob).call()
200
>>> contract.functions.balanceOf(bob).call()
100
>>> tx_hash = contract.functions.transferFrom(alice, bob, 75).transact({'from': bob})
>>> tx_receipt = w3.eth.wait_for_transaction_receipt(tx_hash)
>>> contract.functions.allowance(alice, bob).call()
125
>>> contract.functions.balanceOf(bob).call()
175
Python 中的合约单元测试
这里有一个例子,说明如何在 python、Web3.py、eth-tester 和 PyEVM 中使用 pytest 框架,完全在 python 中执行单元测试,而不需要任何额外的全功能以太坊节点/客户端。要安装所需的依赖项,可以在 web3 和 pytest 中使用针对 eth_tester 的 pinned extra:
$ pip install web3[tester] pytest
一旦建立了测试环境,就可以像这样编写测试:
import pytest from web3 import ( EthereumTesterProvider, Web3, ) @pytest.fixture def tester_provider(): return EthereumTesterProvider() @pytest.fixture def eth_tester(tester_provider): return tester_provider.ethereum_tester @pytest.fixture def w3(tester_provider): return Web3(tester_provider) @pytest.fixture def foo_contract(eth_tester, w3): # For simplicity of this example we statically define the # contract code here. You might read your contracts from a # file, or something else to test with in your own code # # pragma solidity^0.5.3; # # contract Foo { # # string public bar; # event barred(string _bar); # # constructor() public { # bar = "hello world"; # } # # function setBar(string memory _bar) public { # bar = _bar; # emit barred(_bar); # } # # } deploy_address = eth_tester.get_accounts()[0] abi = """[{"anonymous":false,"inputs":[{"indexed":false,"name":"_bar","type":"string"}],"name":"barred","type":"event"},{"constant":false,"inputs":[{"name":"_bar","type":"string"}],"name":"setBar","outputs":[],"payable":false,"stateMutability":"nonpayable","type":"function"},{"inputs":[],"payable":false,"stateMutability":"nonpayable","type":"constructor"},{"constant":true,"inputs":[],"name":"bar","outputs":[{"name":"","type":"string"}],"payable":false,"stateMutability":"view","type":"function"}]""" # noqa: E501 # This bytecode is the output of compiling with # solc version:0.5.3+commit.10d17f24.Emscripten.clang bytecode = """608060405234801561001057600080fd5b506040805190810160405280600b81526020017f68656c6c6f20776f726c640000000000000000000000000000000000000000008152506000908051906020019061005c929190610062565b50610107565b828054600181600116156101000203166002900490600052602060002090601f016020900481019282601f106100a357805160ff19168380011785556100d1565b828001600101855582156100d1579182015b828111156100d05782518255916020019190600101906100b5565b5b5090506100de91906100e2565b5090565b61010491905b808211156101005760008160009055506001016100e8565b5090565b90565b6103bb806101166000396000f3fe608060405234801561001057600080fd5b5060043610610053576000357c01000000000000000000000000000000000000000000000000000000009004806397bc14aa14610058578063febb0f7e14610113575b600080fd5b6101116004803603602081101561006e57600080fd5b810190808035906020019064010000000081111561008b57600080fd5b82018360208201111561009d57600080fd5b803590602001918460018302840111640100000000831117156100bf57600080fd5b91908080601f016020809104026020016040519081016040528093929190818152602001838380828437600081840152601f19601f820116905080830192505050505050509192919290505050610196565b005b61011b61024c565b6040518080602001828103825283818151815260200191508051906020019080838360005b8381101561015b578082015181840152602081019050610140565b50505050905090810190601f1680156101885780820380516001836020036101000a031916815260200191505b509250505060405180910390f35b80600090805190602001906101ac9291906102ea565b507f5f71ad82e16f082de5ff496b140e2fbc8621eeb37b36d59b185c3f1364bbd529816040518080602001828103825283818151815260200191508051906020019080838360005b8381101561020f5780820151818401526020810190506101f4565b50505050905090810190601f16801561023c5780820380516001836020036101000a031916815260200191505b509250505060405180910390a150565b60008054600181600116156101000203166002900480601f0160208091040260200160405190810160405280929190818152602001828054600181600116156101000203166002900480156102e25780601f106102b7576101008083540402835291602001916102e2565b820191906000526020600020905b8154815290600101906020018083116102c557829003601f168201915b505050505081565b828054600181600116156101000203166002900490600052602060002090601f016020900481019282601f1061032b57805160ff1916838001178555610359565b82800160010185558215610359579182015b8281111561035857825182559160200191906001019061033d565b5b509050610366919061036a565b5090565b61038c91905b80821115610388576000816000905550600101610370565b5090565b9056fea165627a7a72305820ae6ca683d45ee8a71bba45caee29e4815147cd308f772c853a20dfe08214dbb50029""" # noqa: E501 # Create our contract class. FooContract = w3.eth.contract(abi=abi, bytecode=bytecode) # issue a transaction to deploy the contract. tx_hash = FooContract.constructor().transact({ 'from': deploy_address, }) # wait for the transaction to be mined tx_receipt = w3.eth.wait_for_transaction_receipt(tx_hash, 180) # instantiate and return an instance of our contract. return FooContract(tx_receipt.contractAddress) def test_initial_greeting(foo_contract): hw = foo_contract.caller.bar() assert hw == "hello world" def test_can_update_greeting(w3, foo_contract): # send transaction that updates the greeting tx_hash = foo_contract.functions.setBar( "testing contracts is easy", ).transact({ 'from': w3.eth.accounts[1], }) w3.eth.wait_for_transaction_receipt(tx_hash, 180) # verify that the contract is now using the updated greeting hw = foo_contract.caller.bar() assert hw == "testing contracts is easy" def test_updating_greeting_emits_event(w3, foo_contract): # send transaction that updates the greeting tx_hash = foo_contract.functions.setBar( "testing contracts is easy", ).transact({ 'from': w3.eth.accounts[1], }) receipt = w3.eth.wait_for_transaction_receipt(tx_hash, 180) # get all of the `barred` logs for the contract logs = foo_contract.events.barred.getLogs() assert len(logs) == 1 # verify that the log's data matches the expected value event = logs[0] assert event.blockHash == receipt.blockHash assert event.args._bar == "testing contracts is easy"
使用 Infura Rinkeby 节点
导入所需的库
from web3 import Web3, HTTPProvider
用 Infura 节点初始化 web3 实例
w3 = Web3(Web3.HTTPProvider("https://rinkeby.infura.io/v3/YOUR_INFURA_KEY"))
将中间件注入到中间件洋葱中
from web3.middleware import geth_poa_middleware
w3.middleware_onion.inject(geth_poa_middleware, layer=0)
请记住,您必须在本地签署所有交易,因为 infura 不会处理您钱包中的任何钥匙(参见此)
transaction = contract.functions.function_Name(params).build_transaction()
transaction.update({ 'gas' : appropriate_gas_amount })
transaction.update({ 'nonce' : w3.eth.get_transaction_count('Your_Wallet_Address') })
signed_tx = w3.eth.account.sign_transaction(transaction, private_key)
页(page 的缩写)s:对交易字典进行了两次更新,因为原始交易可能不包含 gas & nonce 金额,所以您必须手动添加它们。
最后,发送交易
txn_hash = w3.eth.send_raw_transaction(signed_tx.rawTransaction)
txn_receipt = w3.eth.wait_for_transaction_receipt(txn_hash)
提示:之后,您可以使用存储在txn_hash
中的值,在像 etherscan 这样的浏览器中查看交易的细节
调整日志级别
Web3.py 内部使用 Python 日志子系统。
如果您想在调试模式下运行您的应用程序日志,下面是一个如何使一些 JSON-RPC 流量更安静的例子。
import logging
import coloredlogs
def setup_logging(log_level=logging.DEBUG):
"""Setup root logger and quiet some levels."""
logger = logging.getLogger()
# Set log format to dislay the logger name to hunt down verbose logging modules
fmt = "%(name)-25s %(levelname)-8s %(message)s"
# Use colored logging output for console with the coloredlogs package
# https://pypi.org/project/coloredlogs/
coloredlogs.install(level=log_level, fmt=fmt, logger=logger)
# Disable logging of JSON-RPC requests and replies
logging.getLogger("web3.RequestManager").setLevel(logging.WARNING)
logging.getLogger("web3.providers.HTTPProvider").setLevel(logging.WARNING)
# logging.getLogger("web3.RequestManager").propagate = False
# Disable all internal debug logging of requests and urllib3
# E.g. HTTP traffic
logging.getLogger("requests").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.WARNING)
return logger
高级示例:获取所有令牌传输事件
在这个例子中,我们展示了如何从以太坊区块链获取特定事件类型的所有事件。处理大量事件时有三个挑战:
- 如何增量更新已获取事件的现有数据库
- 如何处理长时间运行过程中的中断
- 如何应对 eth_getLogs JSON-RPC 调用查询限制
- 如何在(近)实时数据中处理以太坊小链重组
eth_getLogs 限制
Ethereum JSON-RPC API 服务器,像 Geth,不提供简单的事件分页,只提供块分页。没有请求可以找到带有事件的第一个块,或者在一个块范围内发生了多少个事件。JSON-RPC 服务给你的唯一反馈是 eth_getLogs 调用是否失败。
在这个示例脚本中,我们提供了两种启发式方法来处理这个问题。该脚本扫描块中的事件(起始块号-结束块号)。然后,它使用两种方法来确定一个块窗口中可能有多少个事件:
- 动态设置块范围窗口大小,同时从不超过阈值(例如,10,000 个块)。
- 如果 eth_getLogs JSON-PRC 调用出现超时错误,请减少结束块号,并使用较小的块范围窗口重试。
示例代码
下面的示例代码被分成一个可重用的EventScanner
类和一个演示脚本:
- 获取 RCC 令牌的所有传输事件,
- 可以再次递增运行以检查是否有新的事件,
- 优雅地处理中断(例如,CTRL+C 中止),
- 将所有
Transfer
事件写入 JSON 数据库的单个文件中,以便其他进程可以使用它们, - 在控制台中使用 tqdm 库进行进度条输出,
- 只支持
HTTPS
供应商,因为 JSON-RPC 重试逻辑依赖于底层协议的实现细节, - 禁用标准
http_retry_request_middleware
,因为它不知道如何处理eth_getLogs
的收缩块范围窗口,以及 - 消耗大约 20k 个 JSON-RPC API 调用。
该脚本可以使用:python ./eventscanner.py <your JSON-RPC API URL>
运行。
"""A stateful event scanner for Ethereum-based blockchains using Web3.py.
With the stateful mechanism, you can do one batch scan or incremental scans,
where events are added wherever the scanner left off.
"""
import datetime
import time
import logging
from abc import ABC, abstractmethod
from typing import Tuple, Optional, Callable, List, Iterable
from web3 import Web3
from web3.contract import Contract
from web3.datastructures import AttributeDict
from web3.exceptions import BlockNotFound
from eth_abi.codec import ABICodec
# Currently this method is not exposed over official web3 API,
# but we need it to construct eth_getLogs parameters
from web3._utils.filters import construct_event_filter_params
from web3._utils.events import get_event_data
logger = logging.getLogger(__name__)
class EventScannerState(ABC):
"""Application state that remembers what blocks we have scanned in the case of crash.
"""
@abstractmethod
def get_last_scanned_block(self) -> int:
"""Number of the last block we have scanned on the previous cycle.
:return: 0 if no blocks scanned yet
"""
@abstractmethod
def start_chunk(self, block_number: int):
"""Scanner is about to ask data of multiple blocks over JSON-RPC.
Start a database session if needed.
"""
@abstractmethod
def end_chunk(self, block_number: int):
"""Scanner finished a number of blocks.
Persistent any data in your state now.
"""
@abstractmethod
def process_event(self, block_when: datetime.datetime, event: AttributeDict) -> object:
"""Process incoming events.
This function takes raw events from Web3, transforms them to your application internal
format, then saves them in a database or some other state.
:param block_when: When this block was mined
:param event: Symbolic dictionary of the event data
:return: Internal state structure that is the result of event tranformation.
"""
@abstractmethod
def delete_data(self, since_block: int) -> int:
"""Delete any data since this block was scanned.
Purges any potential minor reorg data.
"""
class EventScanner:
"""Scan blockchain for events and try not to abuse JSON-RPC API too much.
Can be used for real-time scans, as it detects minor chain reorganisation and rescans.
Unlike the easy web3.contract.Contract, this scanner can scan events from multiple contracts at once.
For example, you can get all transfers from all tokens in the same scan.
You *should* disable the default `http_retry_request_middleware` on your provider for Web3,
because it cannot correctly throttle and decrease the `eth_getLogs` block number range.
"""
def __init__(self, web3: Web3, contract: Contract, state: EventScannerState, events: List, filters: {},
max_chunk_scan_size: int = 10000, max_request_retries: int = 30, request_retry_seconds: float = 3.0):
"""
:param contract: Contract
:param events: List of web3 Event we scan
:param filters: Filters passed to getLogs
:param max_chunk_scan_size: JSON-RPC API limit in the number of blocks we query. (Recommendation: 10,000 for mainnet, 500,000 for testnets)
:param max_request_retries: How many times we try to reattempt a failed JSON-RPC call
:param request_retry_seconds: Delay between failed requests to let JSON-RPC server to recover
"""
self.logger = logger
self.contract = contract
self.web3 = web3
self.state = state
self.events = events
self.filters = filters
# Our JSON-RPC throttling parameters
self.min_scan_chunk_size = 10 # 12 s/block = 120 seconds period
self.max_scan_chunk_size = max_chunk_scan_size
self.max_request_retries = max_request_retries
self.request_retry_seconds = request_retry_seconds
# Factor how fast we increase the chunk size if results are found
# # (slow down scan after starting to get hits)
self.chunk_size_decrease = 0.5
# Factor how was we increase chunk size if no results found
self.chunk_size_increase = 2.0
@property
def address(self):
return self.token_address
def get_block_timestamp(self, block_num) -> datetime.datetime:
"""Get Ethereum block timestamp"""
try:
block_info = self.web3.eth.getBlock(block_num)
except BlockNotFound:
# Block was not mined yet,
# minor chain reorganisation?
return None
last_time = block_info["timestamp"]
return datetime.datetime.utcfromtimestamp(last_time)
def get_suggested_scan_start_block(self):
"""Get where we should start to scan for new token events.
If there are no prior scans, start from block 1.
Otherwise, start from the last end block minus ten blocks.
We rescan the last ten scanned blocks in the case there were forks to avoid
misaccounting due to minor single block works (happens once in a hour in Ethereum).
These heurestics could be made more robust, but this is for the sake of simple reference implementation.
"""
end_block = self.get_last_scanned_block()
if end_block:
return max(1, end_block - self.NUM_BLOCKS_RESCAN_FOR_FORKS)
return 1
def get_suggested_scan_end_block(self):
"""Get the last mined block on Ethereum chain we are following."""
# Do not scan all the way to the final block, as this
# block might not be mined yet
return self.web3.eth.blockNumber - 1
def get_last_scanned_block(self) -> int:
return self.state.get_last_scanned_block()
def delete_potentially_forked_block_data(self, after_block: int):
"""Purge old data in the case of blockchain reorganisation."""
self.state.delete_data(after_block)
def scan_chunk(self, start_block, end_block) -> Tuple[int, datetime.datetime, list]:
"""Read and process events between to block numbers.
Dynamically decrease the size of the chunk if the case JSON-RPC server pukes out.
:return: tuple(actual end block number, when this block was mined, processed events)
"""
block_timestamps = {}
get_block_timestamp = self.get_block_timestamp
# Cache block timestamps to reduce some RPC overhead
# Real solution might include smarter models around block
def get_block_when(block_num):
if block_num not in block_timestamps:
block_timestamps[block_num] = get_block_timestamp(block_num)
return block_timestamps[block_num]
all_processed = []
for event_type in self.events:
# Callable that takes care of the underlying web3 call
def _fetch_events(_start_block, _end_block):
return _fetch_events_for_all_contracts(self.web3,
event_type,
self.filters,
from_block=_start_block,
to_block=_end_block)
# Do `n` retries on `eth_getLogs`,
# throttle down block range if needed
end_block, events = _retry_web3_call(
_fetch_events,
start_block=start_block,
end_block=end_block,
retries=self.max_request_retries,
delay=self.request_retry_seconds)
for evt in events:
idx = evt["logIndex"] # Integer of the log index position in the block, null when its pending
# We cannot avoid minor chain reorganisations, but
# at least we must avoid blocks that are not mined yet
assert idx is not None, "Somehow tried to scan a pending block"
block_number = evt["blockNumber"]
# Get UTC time when this event happened (block mined timestamp)
# from our in-memory cache
block_when = get_block_when(block_number)
logger.debug("Processing event %s, block:%d count:%d", evt["event"], evt["blockNumber"])
processed = self.state.process_event(block_when, evt)
all_processed.append(processed)
end_block_timestamp = get_block_when(end_block)
return end_block, end_block_timestamp, all_processed
def estimate_next_chunk_size(self, current_chuck_size: int, event_found_count: int):
"""Try to figure out optimal chunk size
Our scanner might need to scan the whole blockchain for all events
* We want to minimize API calls over empty blocks
* We want to make sure that one scan chunk does not try to process too many entries once, as we try to control commit buffer size and potentially asynchronous busy loop
* Do not overload node serving JSON-RPC API by asking data for too many events at a time
Currently Ethereum JSON-API does not have an API to tell when a first event occurred in a blockchain
and our heuristics try to accelerate block fetching (chunk size) until we see the first event.
These heurestics exponentially increase the scan chunk size depending on if we are seeing events or not.
When any transfers are encountered, we are back to scanning only a few blocks at a time.
It does not make sense to do a full chain scan starting from block 1, doing one JSON-RPC call per 20 blocks.
"""
if event_found_count > 0:
# When we encounter first events, reset the chunk size window
current_chuck_size = self.min_scan_chunk_size
else:
current_chuck_size *= self.chunk_size_increase
current_chuck_size = max(self.min_scan_chunk_size, current_chuck_size)
current_chuck_size = min(self.max_scan_chunk_size, current_chuck_size)
return int(current_chuck_size)
def scan(self, start_block, end_block, start_chunk_size=20, progress_callback=Optional[Callable]) -> Tuple[
list, int]:
"""Perform a token balances scan.
Assumes all balances in the database are valid before start_block (no forks sneaked in).
:param start_block: The first block included in the scan
:param end_block: The last block included in the scan
:param start_chunk_size: How many blocks we try to fetch over JSON-RPC on the first attempt
:param progress_callback: If this is an UI application, update the progress of the scan
:return: [All processed events, number of chunks used]
"""
assert start_block <= end_block
current_block = start_block
# Scan in chunks, commit between
chunk_size = start_chunk_size
last_scan_duration = last_logs_found = 0
total_chunks_scanned = 0
# All processed entries we got on this scan cycle
all_processed = []
while current_block <= end_block:
self.state.start_chunk(current_block, chunk_size)
# Print some diagnostics to logs to try to fiddle with real world JSON-RPC API performance
estimated_end_block = current_block + chunk_size
logger.debug(
"Scanning token transfers for blocks: %d - %d, chunk size %d, last chunk scan took %f, last logs found %d",
current_block, estimated_end_block, chunk_size, last_scan_duration, last_logs_found)
start = time.time()
actual_end_block, end_block_timestamp, new_entries = self.scan_chunk(current_block, estimated_end_block)
# Where does our current chunk scan ends - are we out of chain yet?
current_end = actual_end_block
last_scan_duration = time.time() - start
all_processed += new_entries
# Print progress bar
if progress_callback:
progress_callback(start_block, end_block, current_block, end_block_timestamp, chunk_size, len(new_entries))
# Try to guess how many blocks to fetch over `eth_getLogs` API next time
chunk_size = self.estimate_next_chunk_size(chunk_size, len(new_entries))
# Set where the next chunk starts
current_block = current_end + 1
total_chunks_scanned += 1
self.state.end_chunk(current_end)
return all_processed, total_chunks_scanned
def _retry_web3_call(func, start_block, end_block, retries, delay) -> Tuple[int, list]:
"""A custom retry loop to throttle down block range.
If our JSON-RPC server cannot serve all incoming `eth_getLogs` in a single request,
we retry and throttle down block range for every retry.
For example, Go Ethereum does not indicate what is an acceptable response size.
It just fails on the server-side with a "context was cancelled" warning.
:param func: A callable that triggers Ethereum JSON-RPC, as func(start_block, end_block)
:param start_block: The initial start block of the block range
:param end_block: The initial start block of the block range
:param retries: How many times we retry
:param delay: Time to sleep between retries
"""
for i in range(retries):
try:
return end_block, func(start_block, end_block)
except Exception as e:
# Assume this is HTTPConnectionPool(host='localhost', port=8545): Read timed out. (read timeout=10)
# from Go Ethereum. This translates to the error "context was cancelled" on the server side:
# https://github.com/ethereum/go-ethereum/issues/20426
if i < retries - 1:
# Give some more verbose info than the default middleware
logger.warning(
"Retrying events for block range %d - %d (%d) failed with %s, retrying in %s seconds",
start_block,
end_block,
end_block-start_block,
e,
delay)
# Decrease the `eth_getBlocks` range
end_block = start_block + ((end_block - start_block) // 2)
# Let the JSON-RPC to recover e.g. from restart
time.sleep(delay)
continue
else:
logger.warning("Out of retries")
raise
def _fetch_events_for_all_contracts(
web3,
event,
argument_filters: dict,
from_block: int,
to_block: int) -> Iterable:
"""Get events using eth_getLogs API.
This method is detached from any contract instance.
This is a stateless method, as opposed to createFilter.
It can be safely called against nodes which do not provide `eth_newFilter` API, like Infura.
"""
if from_block is None:
raise TypeError("Missing mandatory keyword argument to getLogs: fromBlock")
# Currently no way to poke this using a public Web3.py API.
# This will return raw underlying ABI JSON object for the event
abi = event._get_event_abi()
# Depending on the Solidity version used to compile
# the contract that uses the ABI,
# it might have Solidity ABI encoding v1 or v2.
# We just assume the default that you set on Web3 object here.
# More information here https://eth-abi.readthedocs.io/en/latest/index.html
codec: ABICodec = web3.codec
# Here we need to poke a bit into Web3 internals, as this
# functionality is not exposed by default.
# Construct JSON-RPC raw filter presentation based on human readable Python descriptions
# Namely, convert event names to their keccak signatures
# More information here:
# https://github.com/ethereum/web3.py/blob/e176ce0793dafdd0573acc8d4b76425b6eb604ca/web3/_utils/filters.py#L71
data_filter_set, event_filter_params = construct_event_filter_params(
abi,
codec,
address=argument_filters.get("address"),
argument_filters=argument_filters,
fromBlock=from_block,
toBlock=to_block
)
logger.debug("Querying eth_getLogs with the following parameters: %s", event_filter_params)
# Call JSON-RPC API on your Ethereum node.
# get_logs() returns raw AttributedDict entries
logs = web3.eth.get_logs(event_filter_params)
# Convert raw binary data to Python proxy objects as described by ABI
all_events = []
for log in logs:
# Convert raw JSON-RPC log result to human readable event by using ABI data
# More information how processLog works here
# https://github.com/ethereum/web3.py/blob/fbaf1ad11b0c7fac09ba34baff2c256cffe0a148/web3/_utils/events.py#L200
evt = get_event_data(codec, abi, log)
# Note: This was originally yield,
# but deferring the timeout exception caused the throttle logic not to work
all_events.append(evt)
return all_events
if __name__ == "__main__":
# Simple demo that scans all the token transfers of RCC token (11k).
# The demo supports persistant state by using a JSON file.
# You will need an Ethereum node for this.
# Running this script will consume around 20k JSON-RPC calls.
# With locally running Geth, the script takes 10 minutes.
# The resulting JSON state file is 2.9 MB.
import sys
import json
from web3.providers.rpc import HTTPProvider
# We use tqdm library to render a nice progress bar in the console
# https://pypi.org/project/tqdm/
from tqdm import tqdm
# RCC has around 11k Transfer events
# https://etherscan.io/token/0x9b6443b0fb9c241a7fdac375595cea13e6b7807a
RCC_ADDRESS = "0x9b6443b0fB9C241A7fdAC375595cEa13e6B7807A"
# Reduced ERC-20 ABI, only Transfer event
ABI = """[
{
"anonymous": false,
"inputs": [
{
"indexed": true,
"name": "from",
"type": "address"
},
{
"indexed": true,
"name": "to",
"type": "address"
},
{
"indexed": false,
"name": "value",
"type": "uint256"
}
],
"name": "Transfer",
"type": "event"
}
]
"""
class JSONifiedState(EventScannerState):
"""Store the state of scanned blocks and all events.
All state is an in-memory dict.
Simple load/store massive JSON on start up.
"""
def __init__(self):
self.state = None
self.fname = "test-state.json"
# How many second ago we saved the JSON file
self.last_save = 0
def reset(self):
"""Create initial state of nothing scanned."""
self.state = {
"last_scanned_block": 0,
"blocks": {},
}
def restore(self):
"""Restore the last scan state from a file."""
try:
self.state = json.load(open(self.fname, "rt"))
print(f"Restored the state, previously {self.state['last_scanned_block']} blocks have been scanned")
except (IOError, json.decoder.JSONDecodeError):
print("State starting from scratch")
self.reset()
def save(self):
"""Save everything we have scanned so far in a file."""
with open(self.fname, "wt") as f:
json.dump(self.state, f)
self.last_save = time.time()
#
# EventScannerState methods implemented below
#
def get_last_scanned_block(self):
"""The number of the last block we have stored."""
return self.state["last_scanned_block"]
def delete_data(self, since_block):
"""Remove potentially reorganised blocks from the scan data."""
for block_num in range(since_block, self.get_last_scanned_block()):
if block_num in self.state["blocks"]:
del self.state["blocks"][block_num]
def start_chunk(self, block_number, chunk_size):
pass
def end_chunk(self, block_number):
"""Save at the end of each block, so we can resume in the case of a crash or CTRL+C"""
# Next time the scanner is started we will resume from this block
self.state["last_scanned_block"] = block_number
# Save the database file for every minute
if time.time() - self.last_save > 60:
self.save()
def process_event(self, block_when: datetime.datetime, event: AttributeDict) -> str:
"""Record a ERC-20 transfer in our database."""
# Events are keyed by their transaction hash and log index
# One transaction may contain multiple events
# and each one of those gets their own log index
# event_name = event.event # "Transfer"
log_index = event.logIndex # Log index within the block
# transaction_index = event.transactionIndex # Transaction index within the block
txhash = event.transactionHash.hex() # Transaction hash
block_number = event.blockNumber
# Convert ERC-20 Transfer event to our internal format
args = event["args"]
transfer = {
"from": args["from"],
"to": args.to,
"value": args.value,
"timestamp": block_when.isoformat(),
}
# Create empty dict as the block that contains all transactions by txhash
if block_number not in self.state["blocks"]:
self.state["blocks"][block_number] = {}
block = self.state["blocks"][block_number]
if txhash not in block:
# We have not yet recorded any transfers in this transaction
# (One transaction may contain multiple events if executed by a smart contract).
# Create a tx entry that contains all events by a log index
self.state["blocks"][block_number][txhash] = {}
# Record ERC-20 transfer in our database
self.state["blocks"][block_number][txhash][log_index] = transfer
# Return a pointer that allows us to look up this event later if needed
return f"{block_number}-{txhash}-{log_index}"
def run():
if len(sys.argv) < 2:
print("Usage: eventscanner.py http://your-node-url")
sys.exit(1)
api_url = sys.argv[1]
# Enable logs to the stdout.
# DEBUG is very verbose level
logging.basicConfig(level=logging.INFO)
provider = HTTPProvider(api_url)
# Remove the default JSON-RPC retry middleware
# as it correctly cannot handle eth_getLogs block range
# throttle down.
provider.middlewares.clear()
web3 = Web3(provider)
# Prepare stub ERC-20 contract object
abi = json.loads(ABI)
ERC20 = web3.eth.contract(abi=abi)
# Restore/create our persistent state
state = JSONifiedState()
state.restore()
# chain_id: int, web3: Web3, abi: dict, state: EventScannerState, events: List, filters: {}, max_chunk_scan_size: int=10000
scanner = EventScanner(
web3=web3,
contract=ERC20,
state=state,
events=[ERC20.events.Transfer],
filters={"address": RCC_ADDRESS},
# How many maximum blocks at the time we request from JSON-RPC
# and we are unlikely to exceed the response size limit of the JSON-RPC server
max_chunk_scan_size=10000
)
# Assume we might have scanned the blocks all the way to the last Ethereum block
# that mined a few seconds before the previous scan run ended.
# Because there might have been a minor Etherueum chain reorganisations
# since the last scan ended, we need to discard
# the last few blocks from the previous scan results.
chain_reorg_safety_blocks = 10
scanner.delete_potentially_forked_block_data(state.get_last_scanned_block() - chain_reorg_safety_blocks)
# Scan from [last block scanned] - [latest ethereum block]
# Note that our chain reorg safety blocks cannot go negative
start_block = max(state.get_last_scanned_block() - chain_reorg_safety_blocks, 0)
end_block = scanner.get_suggested_scan_end_block()
blocks_to_scan = end_block - start_block
print(f"Scanning events from blocks {start_block} - {end_block}")
# Render a progress bar in the console
start = time.time()
with tqdm(total=blocks_to_scan) as progress_bar:
def _update_progress(start, end, current, current_block_timestamp, chunk_size, events_count):
if current_block_timestamp:
formatted_time = current_block_timestamp.strftime("%d-%m-%Y")
else:
formatted_time = "no block time available"
progress_bar.set_description(f"Current block: {current} ({formatted_time}), blocks in a scan batch: {chunk_size}, events processed in a batch {events_count}")
progress_bar.update(chunk_size)
# Run the scan
result, total_chunks_scanned = scanner.scan(start_block, end_block, progress_callback=_update_progress)
state.save()
duration = time.time() - start
print(f"Scanned total {len(result)} Transfer events, in {duration} seconds, total {total_chunks_scanned} chunk scans performed")
run()