Skip to content

Commit

Permalink
Merge pull request #737 from valory-xyz/feat/handle-rpc-timeouts
Browse files Browse the repository at this point in the history
Implement a custom filtering method
  • Loading branch information
Adamantios authored Apr 22, 2024
2 parents 0673f56 + 55bbd35 commit 614dd04
Show file tree
Hide file tree
Showing 12 changed files with 439 additions and 10 deletions.
28 changes: 27 additions & 1 deletion aea/crypto/base.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
# ------------------------------------------------------------------------------
#
# Copyright 2021-2023 Valory AG
# Copyright 2021-2024 Valory AG
# Copyright 2018-2019 Fetch.AI Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -529,6 +529,32 @@ def get_transaction_transfer_logs(
:param target_address: optional address to filter transfer events to just those that affect it
"""

@abstractmethod
def filter_event(
self,
event: Any,
match_single: Dict[str, Any],
match_any: Dict[str, Any],
to_block: int,
from_block: int,
batch_size: int,
max_retries: int,
reduce_factor: float,
timeout: int,
) -> Optional[JSONLike]:
"""Filter an event using batching to avoid RPC timeouts.
:param event: the event to filter for.
:param match_single: the filter parameters with value checking against the event abi. It allows for defining a single match value.
:param match_any: the filter parameters with value checking against the event abi. It allows for defining multiple match values.
:param to_block: the block to which to filter.
:param from_block: the block from which to start filtering.
:param batch_size: the blocks' batch size of the filtering.
:param max_retries: the maximum number of retries.
:param reduce_factor: the percentage by which the batch size is reduced in case of a timeout.
:param timeout: a timeout in seconds to interrupt the operation in case the RPC request hangs.
"""


class FaucetApi(ABC):
"""Interface for testnet faucet APIs."""
Expand Down
26 changes: 26 additions & 0 deletions docs/api/crypto/base.md
Original file line number Diff line number Diff line change
Expand Up @@ -808,6 +808,32 @@ Get all transfer events derived from a transaction.
- `tx_hash`: the transaction hash
- `target_address`: optional address to filter transfer events to just those that affect it

<a id="aea.crypto.base.LedgerApi.filter_event"></a>

#### filter`_`event

```python
@abstractmethod
def filter_event(event: Any, match_single: Dict[str, Any],
match_any: Dict[str, Any], to_block: int, from_block: int,
batch_size: int, max_retries: int, reduce_factor: float,
timeout: int) -> Optional[JSONLike]
```

Filter an event using batching to avoid RPC timeouts.

**Arguments**:

- `event`: the event to filter for.
- `match_single`: the filter parameters with value checking against the event abi. It allows for defining a single match value.
- `match_any`: the filter parameters with value checking against the event abi. It allows for defining multiple match values.
- `to_block`: the block to which to filter.
- `from_block`: the block from which to start filtering.
- `batch_size`: the blocks' batch size of the filtering.
- `max_retries`: the maximum number of retries.
- `reduce_factor`: the percentage by which the batch size is reduced in case of a timeout.
- `timeout`: a timeout in seconds to interrupt the operation in case the RPC request hangs.

<a id="aea.crypto.base.FaucetApi"></a>

## FaucetApi Objects
Expand Down
25 changes: 25 additions & 0 deletions docs/api/plugins/aea_ledger_cosmos/cosmos.md
Original file line number Diff line number Diff line change
Expand Up @@ -972,6 +972,31 @@ This operation is not supported for cosmos.
- `raise_on_try`: whether the method will raise or log on error.
- `kwargs`: the keyword arguments.

<a id="plugins.aea-ledger-cosmos.aea_ledger_cosmos.cosmos._CosmosApi.filter_event"></a>

#### filter`_`event

```python
def filter_event(event: Any, match_single: Dict[str, Any],
match_any: Dict[str, Any], to_block: int, from_block: int,
batch_size: int, max_retries: int, reduce_factor: float,
timeout: int) -> Optional[JSONLike]
```

Filter an event using batching to avoid RPC timeouts.

**Arguments**:

- `event`: the event to filter for.
- `match_single`: the filter parameters with value checking against the event abi. It allows for defining a single match value.
- `match_any`: the filter parameters with value checking against the event abi. It allows for defining multiple match values.
- `to_block`: the block to which to filter.
- `from_block`: the block from which to start filtering.
- `batch_size`: the blocks' batch size of the filtering.
- `max_retries`: the maximum number of retries.
- `reduce_factor`: the percentage by which the batch size is reduced in case of a timeout.
- `timeout`: a timeout in seconds to interrupt the operation in case the RPC request hangs.

<a id="plugins.aea-ledger-cosmos.aea_ledger_cosmos.cosmos.CosmosApi"></a>

## CosmosApi Objects
Expand Down
67 changes: 67 additions & 0 deletions docs/api/plugins/aea_ledger_ethereum/ethereum.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,27 @@ def get_gas_price_strategy(

Get the gas price strategy.

<a id="plugins.aea-ledger-ethereum.aea_ledger_ethereum.ethereum.rpc_call_with_timeout"></a>

#### rpc`_`call`_`with`_`timeout

```python
def rpc_call_with_timeout(func: Callable, timeout: int) -> Any
```

Execute an RPC call with a timeout.

<a id="plugins.aea-ledger-ethereum.aea_ledger_ethereum.ethereum.match_items"></a>

#### match`_`items

```python
def match_items(filter_: EventFilterBuilder, event_name: str,
method_to_matches: Dict[str, Dict[str, Any]])
```

Build filters for the given match dictionary.

<a id="plugins.aea-ledger-ethereum.aea_ledger_ethereum.ethereum.SignedTransactionTranslator"></a>

## SignedTransactionTranslator Objects
Expand Down Expand Up @@ -942,6 +963,52 @@ This operation is not supported for ethereum. Please use the ethereum_flashbots
- `raise_on_try`: whether the method will raise or log on error
- `kwargs`: the keyword arguments.

<a id="plugins.aea-ledger-ethereum.aea_ledger_ethereum.ethereum.EthereumApi.batch_filter_wrapper"></a>

#### batch`_`filter`_`wrapper

```python
def batch_filter_wrapper(event: ContractEvent, match_single: Dict[str, Any],
match_any: Dict[str, Any], to_block: BlockNumber,
from_block: BlockNumber) -> Any
```

A wrapper for a single batch's event filtering operation.

<a id="plugins.aea-ledger-ethereum.aea_ledger_ethereum.ethereum.EthereumApi.filter_event"></a>

#### filter`_`event

```python
def filter_event(event: ContractEvent,
match_single: Dict[str, Any],
match_any: Dict[str, Any],
to_block: BlockNumber,
from_block: BlockNumber = 0,
batch_size: int = 5_000,
max_retries: int = 5,
reduce_factor: float = 0.25,
timeout: int = 5 * 60) -> Optional[JSONLike]
```

Filter an event using batching to avoid RPC timeouts.

**Arguments**:

- `event`: the event to filter for.
- `match_single`: the filter parameters with value checking against the event abi. It allows for defining a single match value.
- `match_any`: the filter parameters with value checking against the event abi. It allows for defining multiple match values.
- `to_block`: the block to which to filter.
- `from_block`: the block from which to start filtering.
- `batch_size`: the blocks' batch size of the filtering.
- `max_retries`: the maximum number of retries.
- `reduce_factor`: the percentage by which the batch size is reduced in case of a timeout.
- `timeout`: a timeout in seconds to interrupt the operation in case the RPC request hangs.

**Returns**:

the filtering result.

<a id="plugins.aea-ledger-ethereum.aea_ledger_ethereum.ethereum.EthereumFaucetApi"></a>

## EthereumFaucetApi Objects
Expand Down
25 changes: 25 additions & 0 deletions docs/api/plugins/aea_ledger_fetchai/fetchai.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,31 @@ This operation is not supported for fetchai.
- `raise_on_try`: whether the method will raise or log on error.
- `kwargs`: the keyword arguments.

<a id="plugins.aea-ledger-fetchai.aea_ledger_fetchai.fetchai.FetchAIApi.filter_event"></a>

#### filter`_`event

```python
def filter_event(event: Any, match_single: Dict[str, Any],
match_any: Dict[str, Any], to_block: int, from_block: int,
batch_size: int, max_retries: int, reduce_factor: float,
timeout: int) -> Optional[JSONLike]
```

Filter an event using batching to avoid RPC timeouts.

**Arguments**:

- `event`: the event to filter for.
- `match_single`: the filter parameters with value checking against the event abi. It allows for defining a single match value.
- `match_any`: the filter parameters with value checking against the event abi. It allows for defining multiple match values.
- `to_block`: the block to which to filter.
- `from_block`: the block from which to start filtering.
- `batch_size`: the blocks' batch size of the filtering.
- `max_retries`: the maximum number of retries.
- `reduce_factor`: the percentage by which the batch size is reduced in case of a timeout.
- `timeout`: a timeout in seconds to interrupt the operation in case the RPC request hangs.

<a id="plugins.aea-ledger-fetchai.aea_ledger_fetchai.fetchai.FetchAIFaucetApi"></a>

## FetchAIFaucetApi Objects
Expand Down
25 changes: 25 additions & 0 deletions docs/api/plugins/aea_ledger_solana/solana.md
Original file line number Diff line number Diff line change
Expand Up @@ -559,3 +559,28 @@ Get all transfer events derived from a transaction.

the transfer logs

<a id="plugins.aea-ledger-solana.aea_ledger_solana.solana.SolanaApi.filter_event"></a>

#### filter`_`event

```python
def filter_event(event: Any, match_single: Dict[str, Any],
match_any: Dict[str, Any], to_block: int, from_block: int,
batch_size: int, max_retries: int, reduce_factor: float,
timeout: int) -> Optional[JSONLike]
```

Filter an event using batching to avoid RPC timeouts.

**Arguments**:

- `event`: the event to filter for.
- `match_single`: the filter parameters with value checking against the event abi. It allows for defining a single match value.
- `match_any`: the filter parameters with value checking against the event abi. It allows for defining multiple match values.
- `to_block`: the block to which to filter.
- `from_block`: the block from which to start filtering.
- `batch_size`: the blocks' batch size of the filtering.
- `max_retries`: the maximum number of retries.
- `reduce_factor`: the percentage by which the batch size is reduced in case of a timeout.
- `timeout`: a timeout in seconds to interrupt the operation in case the RPC request hangs.

10 changes: 5 additions & 5 deletions docs/package_list.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
| contract/fetchai/erc1155/0.22.0 | `bafybeiff7a6xncyad53o2r7lekpnhexcspze6ocy55xtpzqeuacnlpunm4` |
| connection/fetchai/gym/0.19.0 | `bafybeicqqvl4tt3qbulnkoffciagmfd6p3hxxi3i2mrrqtnbycv757pn6y` |
| connection/fetchai/stub/0.21.0 | `bafybeibybboiwgklfiqpkkcw6rwj65s5jalzfzf6mh6fstxdlt6habzwvy` |
| connection/valory/ledger/0.19.0 | `bafybeic3ft7l7ca3qgnderm4xupsfmyoihgi27ukotnz7b5hdczla2enya` |
| connection/valory/ledger/0.19.0 | `bafybeig7woeog4srdby75hpjkmx4rhpkzncbf4h2pm5r6varsp26pf2uhu` |
| connection/valory/http_server/0.22.0 | `bafybeihpgu56ovmq4npazdbh6y6ru5i7zuv6wvdglpxavsckyih56smu7m` |
| connection/valory/p2p_libp2p/0.1.0 | `bafybeic2u7azbwjny2nhaltqnbohlvysx3x6ectzbege7sxwrbzcz4lcma` |
| connection/valory/p2p_libp2p_client/0.1.0 | `bafybeid3xg5k2ol5adflqloy75ibgljmol6xsvzvezebsg7oudxeeolz7e` |
Expand All @@ -26,12 +26,12 @@
| skill/fetchai/error_test_skill/0.1.0 | `bafybeihsbtlpe7h6fsvoxban5rilkmwviwkokul5cqym6atoolirontiyu` |
| skill/fetchai/gym/0.20.0 | `bafybeie7y2fsxfuhsqxqcaluo5exskmrm5q3a6e2hfcskcuvzvxjjhijh4` |
| skill/fetchai/http_echo/0.20.0 | `bafybeicfiri2juaqh3azeit3z3rf44kgxdo6oj4lgxjgvnowq6m7j47qrm` |
| skill/fetchai/erc1155_client/0.28.0 | `bafybeiauu446slcix3khzdqlgbt5ab323ik5fc3s7s4lxoanecrewfktju` |
| skill/fetchai/erc1155_deploy/0.30.0 | `bafybeifgsf5wp6lb6ztkf7orbhv2wc7b4mrxosgyt6yrnjclmssmsqru2e` |
| skill/fetchai/erc1155_client/0.28.0 | `bafybeigwh2lnnvaimjhrf6eibhy7wohfokvkwoabjr5t2vhyrzqmlu4tg4` |
| skill/fetchai/erc1155_deploy/0.30.0 | `bafybeiepibtz2uioupucjtmo4n6dyxnx2m26bausegojmw6su6mtjxkvpe` |
| skill/fetchai/error/0.17.0 | `bafybeicboomvykqhel3otyv4qg5t3hzpo6kmn5bk4ljluithhuieu7flsm` |
| skill/fetchai/fipa_dummy_buyer/0.2.0 | `bafybeidgso7lo5ay44mbxsp3lxilrgeek3ye44e6wus2ayq6kyxfvc3vjm` |
| skill/fetchai/generic_buyer/0.26.0 | `bafybeif56kwbxbtuhjzd7dohivfdzlkvgprnugymlyizazudzkded3nblm` |
| skill/fetchai/generic_seller/0.27.0 | `bafybeic6sgtjyd5j4xqwuruijtqnl22y3sfdbf3mnrkchu2x4bx7eo2t6e` |
| skill/fetchai/generic_buyer/0.26.0 | `bafybeidnlxecybia3nkvakemei74z7vnuszql4pmne5xiuxypjmklb27ei` |
| skill/fetchai/generic_seller/0.27.0 | `bafybeihs37oub5qnwybyrkfxlqnjbjcfjqfwtwmdogtsbexo3nguhdsnc4` |
| skill/fetchai/task_test_skill/0.1.0 | `bafybeidv77u2xl52mnxakwvh7fuh46aiwfpteyof4eaptfd4agoi6cdble` |
| agent/fetchai/error_test/0.1.0 | `bafybeiecm675ndzbh35jkejtxn4ughoutztltjhgwzfbp57okabedjmnpq` |
| agent/fetchai/gym_aea/0.25.0 | `bafybeibzn3qomqmkaksgpd3gn6aijffvvw7rojswhoytiovohuc737fvfm` |
Expand Down
30 changes: 29 additions & 1 deletion plugins/aea-ledger-cosmos/aea_ledger_cosmos/cosmos.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
# ------------------------------------------------------------------------------
#
# Copyright 2021-2023 Valory AG
# Copyright 2021-2024 Valory AG
# Copyright 2018-2019 Fetch.AI Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -1563,6 +1563,34 @@ def send_signed_transactions(
f"Sending a bundle of transactions is not supported for the {self.identifier} plugin"
)

def filter_event(
self,
event: Any,
match_single: Dict[str, Any],
match_any: Dict[str, Any],
to_block: int,
from_block: int,
batch_size: int,
max_retries: int,
reduce_factor: float,
timeout: int,
) -> Optional[JSONLike]:
"""Filter an event using batching to avoid RPC timeouts.
:param event: the event to filter for.
:param match_single: the filter parameters with value checking against the event abi. It allows for defining a single match value.
:param match_any: the filter parameters with value checking against the event abi. It allows for defining multiple match values.
:param to_block: the block to which to filter.
:param from_block: the block from which to start filtering.
:param batch_size: the blocks' batch size of the filtering.
:param max_retries: the maximum number of retries.
:param reduce_factor: the percentage by which the batch size is reduced in case of a timeout.
:param timeout: a timeout in seconds to interrupt the operation in case the RPC request hangs.
"""
raise NotImplementedError( # pragma: nocover
f"Custom events' filtering is not supported for the {self.identifier} plugin"
)


class CosmosApi(_CosmosApi, CosmosHelper):
"""Class to interact with the Cosmos SDK via a HTTP APIs."""
Expand Down
Loading

0 comments on commit 614dd04

Please sign in to comment.