Skip to content

Commit

Permalink
Merge pull request #68 from fjarri/get-logs
Browse files Browse the repository at this point in the history
More support for event logs
  • Loading branch information
fjarri authored Feb 10, 2024
2 parents 9f9c42d + ffdb7ca commit 755d249
Show file tree
Hide file tree
Showing 6 changed files with 255 additions and 99 deletions.
6 changes: 5 additions & 1 deletion docs/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ Changelog
~~~~~~~~~~~~~~~~~~

Changed
~~~~~~~
^^^^^^^

- Added an explicit ``typing_extensions`` dependency. (PR_57_)
- Various boolean arguments are now keyword-only to prevent usage errors. (PR_57_)
Expand All @@ -30,6 +30,8 @@ Added
- Expose ``Provider`` at the top level. (PR_63_)
- ``eth_getCode`` support (as ``ClientSession.eth_get_code()``). (PR_64_)
- ``eth_getStorageAt`` support (as ``ClientSession.eth_get_storage_at()``). (PR_64_)
- Support for the ``logs`` field in ``TxReceipt``. (PR_68_)
- ``ClientSession.eth_get_logs()`` and ``eth_get_filter_logs()``. (PR_68_)


Fixed
Expand All @@ -40,6 +42,7 @@ Fixed
- The transaction tip being set larger than the max gas price (which some providers don't like). (PR_64_)
- Decoding error when fetching pending transactions. (PR_65_)
- Decoding error when fetching pending blocks. (PR_67_)
- Get the default nonce based on the pending block, not the latest one. (PR_68_)


.. _PR_51: https://github.com/fjarri/pons/pull/51
Expand All @@ -54,6 +57,7 @@ Fixed
.. _PR_64: https://github.com/fjarri/pons/pull/64
.. _PR_65: https://github.com/fjarri/pons/pull/65
.. _PR_67: https://github.com/fjarri/pons/pull/67
.. _PR_68: https://github.com/fjarri/pons/pull/68


0.7.0 (09-07-2023)
Expand Down
111 changes: 78 additions & 33 deletions pons/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,7 @@ async def broadcast_transfer(
# TODO (#19): implement gas strategies
max_gas_price = await self.eth_gas_price()
max_tip = min(Amount.gwei(1), max_gas_price)
nonce = await self.eth_get_transaction_count(signer.address, Block.LATEST)
nonce = await self.eth_get_transaction_count(signer.address, Block.PENDING)
tx: Dict[str, Union[int, str]] = {
"type": 2, # EIP-2930 transaction
"chainId": rpc_encode_quantity(chain_id),
Expand Down Expand Up @@ -613,7 +613,7 @@ async def deploy(
# TODO (#19): implement gas strategies
max_gas_price = await self.eth_gas_price()
max_tip = min(Amount.gwei(1), max_gas_price)
nonce = await self.eth_get_transaction_count(signer.address, Block.LATEST)
nonce = await self.eth_get_transaction_count(signer.address, Block.PENDING)
tx: Dict[str, Union[int, str]] = {
"type": 2, # EIP-2930 transaction
"chainId": rpc_encode_quantity(chain_id),
Expand Down Expand Up @@ -665,7 +665,7 @@ async def broadcast_transact(
# TODO (#19): implement gas strategies
max_gas_price = await self.eth_gas_price()
max_tip = min(Amount.gwei(1), max_gas_price)
nonce = await self.eth_get_transaction_count(signer.address, Block.LATEST)
nonce = await self.eth_get_transaction_count(signer.address, Block.PENDING)
tx: Dict[str, Union[int, str]] = {
"type": 2, # EIP-2930 transaction
"chainId": rpc_encode_quantity(chain_id),
Expand Down Expand Up @@ -739,6 +739,49 @@ async def transact(

return results

def _encode_filter_params(
self,
source: Optional[Union[Address, Iterable[Address]]],
event_filter: Optional[EventFilter],
from_block: Union[int, Block],
to_block: Union[int, Block],
) -> JSON:
params: Dict[str, Any] = {
"fromBlock": rpc_encode_block(from_block),
"toBlock": rpc_encode_block(to_block),
}
if isinstance(source, Address):
params["address"] = source.rpc_encode()
elif source:
params["address"] = [address.rpc_encode() for address in source]
if event_filter:
encoded_topics: List[Optional[List[str]]] = []
for topic in event_filter.topics:
if topic is None:
encoded_topics.append(None)
else:
encoded_topics.append([elem.rpc_encode() for elem in topic])
params["topics"] = encoded_topics
return params

@rpc_call("eth_getLogs")
async def eth_get_logs(
self,
source: Optional[Union[Address, Iterable[Address]]] = None,
event_filter: Optional[EventFilter] = None,
from_block: Union[int, Block] = Block.LATEST,
to_block: Union[int, Block] = Block.LATEST,
) -> Tuple[LogEntry, ...]:
"""Calls the ``eth_getLogs`` RPC method."""
params = self._encode_filter_params(
source=source, event_filter=event_filter, from_block=from_block, to_block=to_block
)
result = await self._provider_session.rpc("eth_getLogs", params)
# TODO: this will go away with generalized RPC decoding.
if not isinstance(result, list):
raise InvalidResponse(f"Expected a list as a response, got {type(result).__name__}")
return tuple(LogEntry.rpc_decode(ResponseDict(elem)) for elem in result)

@rpc_call("eth_newBlockFilter")
async def eth_new_block_filter(self) -> BlockFilter:
"""Calls the ``eth_newBlockFilter`` RPC method."""
Expand All @@ -764,49 +807,51 @@ async def eth_new_filter(
to_block: Union[int, Block] = Block.LATEST,
) -> LogFilter:
"""Calls the ``eth_newFilter`` RPC method."""
params: Dict[str, Any] = {
"fromBlock": rpc_encode_block(from_block),
"toBlock": rpc_encode_block(to_block),
}
if isinstance(source, Address):
params["address"] = source.rpc_encode()
elif source:
params["address"] = [address.rpc_encode() for address in source]
if event_filter:
encoded_topics: List[Optional[List[str]]] = []
for topic in event_filter.topics:
if topic is None:
encoded_topics.append(None)
else:
encoded_topics.append([elem.rpc_encode() for elem in topic])
params["topics"] = encoded_topics

params = self._encode_filter_params(
source=source, event_filter=event_filter, from_block=from_block, to_block=to_block
)
result, provider_path = await self._provider_session.rpc_and_pin("eth_newFilter", params)
filter_id = LogFilterId.rpc_decode(result)
return LogFilter(id_=filter_id, provider_path=provider_path)

@rpc_call("eth_getFilterChangers")
def _parse_filter_result(
self,
filter_: Union[BlockFilter, PendingTransactionFilter, LogFilter],
result: JSON,
) -> Union[Tuple[BlockHash, ...], Tuple[TxHash, ...], Tuple[LogEntry, ...]]:
# TODO: this will go away with generalized RPC decoding.
if not isinstance(result, list):
raise InvalidResponse(f"Expected a list as a response, got {type(result).__name__}")

if isinstance(filter_, BlockFilter):
return tuple(BlockHash.rpc_decode(elem) for elem in result)
if isinstance(filter_, PendingTransactionFilter):
return tuple(TxHash.rpc_decode(elem) for elem in result)
return tuple(LogEntry.rpc_decode(ResponseDict(elem)) for elem in result)

@rpc_call("eth_getFilterLogs")
async def eth_get_filter_logs(
self, filter_: Union[BlockFilter, PendingTransactionFilter, LogFilter]
) -> Union[Tuple[BlockHash, ...], Tuple[TxHash, ...], Tuple[LogEntry, ...]]:
"""Calls the ``eth_getFilterLogs`` RPC method."""
result = await self._provider_session.rpc_at_pin(
filter_.provider_path, "eth_getFilterLogs", filter_.id_.rpc_encode()
)
return self._parse_filter_result(filter_, result)

@rpc_call("eth_getFilterChanges")
async def eth_get_filter_changes(
self, filter_: Union[BlockFilter, PendingTransactionFilter, LogFilter]
) -> Union[Tuple[BlockHash, ...], Tuple[TxHash, ...], Tuple[LogEntry, ...]]:
"""
Calls the ``eth_getFilterChangers`` RPC method.
Calls the ``eth_getFilterChanges`` RPC method.
Depending on what ``filter_`` was, returns a tuple of corresponding results.
"""
# TODO: split into separate functions with specific return types?
results = await self._provider_session.rpc_at_pin(
result = await self._provider_session.rpc_at_pin(
filter_.provider_path, "eth_getFilterChanges", filter_.id_.rpc_encode()
)

# TODO: this will go away with generalized RPC decoding.
if not isinstance(results, list):
raise InvalidResponse(f"Expected a list as a response, got {type(results).__name__}")

if isinstance(filter_, BlockFilter):
return tuple(BlockHash.rpc_decode(elem) for elem in results)
if isinstance(filter_, PendingTransactionFilter):
return tuple(TxHash.rpc_decode(elem) for elem in results)
return tuple(LogEntry.rpc_decode(ResponseDict(elem)) for elem in results)
return self._parse_filter_result(filter_, result)

async def iter_blocks(self, poll_interval: int = 1) -> AsyncIterator[BlockHash]:
"""Yields hashes of new blocks being mined."""
Expand Down
129 changes: 68 additions & 61 deletions pons/_entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,67 @@ def rpc_decode(cls, val: ResponseDict) -> "BlockInfo":
)


class LogEntry(NamedTuple):
"""Log entry metadata."""

removed: bool
"""
``True`` if log was removed, due to a chain reorganization.
``False`` if it is a valid log.
"""

address: Address
"""
The contract address from which this log originated.
"""

data: bytes
"""ABI-packed non-indexed arguments of the event."""

topics: Tuple[LogTopic, ...]
"""
Values of indexed event fields.
For a named event, the first topic is the event's selector.
"""

# In the docs of major providers (Infura, Alchemy, Quicknode) it is claimed
# that the following fields can be null if "it is a pending log".
# I could not reproduce such behavior, so for now they're staying non-nullable.

log_index: int
"""Log's position in the block."""

transaction_index: int
"""Transaction's position in the block."""

transaction_hash: TxHash
"""Hash of the transactions this log was created from."""

block_hash: BlockHash
"""Hash of the block where this log was in."""

block_number: int
"""The block number where this log was."""

@classmethod
def rpc_decode(cls, val: ResponseDict) -> "LogEntry":
topics = val["topics"]
if not isinstance(topics, Iterable):
raise RPCDecodingError(f"`topics` in a log entry must be an iterable, got {topics}")

return cls(
removed=rpc_decode_bool(val["removed"]),
log_index=rpc_decode_quantity(val["logIndex"]),
transaction_index=rpc_decode_quantity(val["transactionIndex"]),
transaction_hash=TxHash.rpc_decode(val["transactionHash"]),
block_hash=BlockHash.rpc_decode(val["blockHash"]),
block_number=rpc_decode_quantity(val["blockNumber"]),
address=Address.rpc_decode(val["address"]),
data=rpc_decode_data(val["data"]),
topics=tuple(LogTopic.rpc_decode(topic) for topic in topics),
)


class TxReceipt(NamedTuple):
"""Transaction receipt."""

Expand Down Expand Up @@ -496,9 +557,15 @@ class TxReceipt(NamedTuple):
succeeded: bool
"""Whether the transaction was successful."""

logs: Tuple[LogEntry, ...]

@classmethod
def rpc_decode(cls, val: ResponseDict) -> "TxReceipt":
contract_address = val["contractAddress"]
logs = val["logs"]
if not isinstance(logs, Iterable):
raise RPCDecodingError(f"`logs` in a tx receipt must be an iterable, got {logs}")

return cls(
block_hash=BlockHash.rpc_decode(val["blockHash"]),
block_number=rpc_decode_quantity(val["blockNumber"]),
Expand All @@ -512,67 +579,7 @@ def rpc_decode(cls, val: ResponseDict) -> "TxReceipt":
transaction_index=rpc_decode_quantity(val["transactionIndex"]),
type_=rpc_decode_quantity(val["type"]),
succeeded=(rpc_decode_quantity(val["status"]) == 1),
)


class LogEntry(NamedTuple):
"""Log entry metadata."""

removed: bool
"""
``True`` if log was removed, due to a chain reorganization.
``False`` if it is a valid log.
"""

address: Address
"""
The contract address from which this log originated.
"""

data: bytes
"""ABI-packed non-indexed arguments of the event."""

topics: Tuple[LogTopic, ...]
"""
Values of indexed event fields.
For a named event, the first topic is the event's selector.
"""

# In the docs of major providers (Infura, Alchemy, Quicknode) it is claimed
# that the following fields can be null if "it is a pending log".
# I could not reproduce such behavior, so for now they're staying non-nullable.

log_index: int
"""Log's position in the block."""

transaction_index: int
"""Transaction's position in the block."""

transaction_hash: TxHash
"""Hash of the transactions this log was created from."""

block_hash: BlockHash
"""Hash of the block where this log was in."""

block_number: int
"""The block number where this log was."""

@classmethod
def rpc_decode(cls, val: ResponseDict) -> "LogEntry":
topics = val["topics"]
if not isinstance(topics, Iterable):
raise RPCDecodingError(f"`topics` in a log entry must be an iterable, got {topics}")

return cls(
removed=rpc_decode_bool(val["removed"]),
log_index=rpc_decode_quantity(val["logIndex"]),
transaction_index=rpc_decode_quantity(val["transactionIndex"]),
transaction_hash=TxHash.rpc_decode(val["transactionHash"]),
block_hash=BlockHash.rpc_decode(val["blockHash"]),
block_number=rpc_decode_quantity(val["blockNumber"]),
address=Address.rpc_decode(val["address"]),
data=rpc_decode_data(val["data"]),
topics=tuple(LogTopic.rpc_decode(topic) for topic in topics),
logs=tuple(LogEntry.rpc_decode(ResponseDict(entry)) for entry in logs),
)


Expand Down
Loading

0 comments on commit 755d249

Please sign in to comment.