From 30eb08e55eeeaefe6245daf3e884fc5f30ea702e Mon Sep 17 00:00:00 2001 From: adel Date: Wed, 31 Jul 2024 13:00:46 +0200 Subject: [PATCH] fix: Price pusher concurrency issue onchain (#180) * fix(price_pusher_concurrency): Concurrency stuff * fix(price_pusher_concurrency): Onchain concurrency issue * fix(price_pusher_concurrency): Logging stuff * fix(price_pusher_concurrency): Fixed units * feat: update price pusher configs --------- Co-authored-by: 0xevolve --- infra/price-pusher/config/config.mainnet.yaml | 7 ++- infra/price-pusher/config/config.sepolia.yaml | 7 ++- pragma-sdk/pragma_sdk/onchain/mixins/nonce.py | 2 +- price-pusher/price_pusher/core/pusher.py | 57 +++++++++++++------ price-pusher/price_pusher/orchestrator.py | 8 +-- price-pusher/tests/unit/test_orchestrator.py | 3 - 6 files changed, 56 insertions(+), 28 deletions(-) diff --git a/infra/price-pusher/config/config.mainnet.yaml b/infra/price-pusher/config/config.mainnet.yaml index 862c37af..710eb03e 100644 --- a/infra/price-pusher/config/config.mainnet.yaml +++ b/infra/price-pusher/config/config.mainnet.yaml @@ -10,11 +10,16 @@ - USDC/USD - USDT/USD - DAI/USD + time_difference: 120 + price_deviation: 0.0025 + +- pairs: + spot: - LORDS/USD - ZEND/USD - NSTR/USD time_difference: 120 - price_deviation: 0.0025 + price_deviation: 0.005 - pairs: future: diff --git a/infra/price-pusher/config/config.sepolia.yaml b/infra/price-pusher/config/config.sepolia.yaml index 862c37af..710eb03e 100644 --- a/infra/price-pusher/config/config.sepolia.yaml +++ b/infra/price-pusher/config/config.sepolia.yaml @@ -10,11 +10,16 @@ - USDC/USD - USDT/USD - DAI/USD + time_difference: 120 + price_deviation: 0.0025 + +- pairs: + spot: - LORDS/USD - ZEND/USD - NSTR/USD time_difference: 120 - price_deviation: 0.0025 + price_deviation: 0.005 - pairs: future: diff --git a/pragma-sdk/pragma_sdk/onchain/mixins/nonce.py b/pragma-sdk/pragma_sdk/onchain/mixins/nonce.py index 1d98373b..e0cb3c16 100644 --- a/pragma-sdk/pragma_sdk/onchain/mixins/nonce.py +++ b/pragma-sdk/pragma_sdk/onchain/mixins/nonce.py @@ -119,7 +119,7 @@ async def get_nonce( block_number=block_number, ) - return int(nonce) + return nonce # type: ignore[no-any-return] async def get_status( self, diff --git a/price-pusher/price_pusher/core/pusher.py b/price-pusher/price_pusher/core/pusher.py index d766c690..55cbdbc1 100644 --- a/price-pusher/price_pusher/core/pusher.py +++ b/price-pusher/price_pusher/core/pusher.py @@ -1,6 +1,10 @@ +import asyncio +import time + from abc import ABC, abstractmethod from typing import List, Optional, Dict +from starknet_py.contract import InvokeResult from pragma_sdk.common.types.client import PragmaClient from pragma_sdk.common.types.entry import Entry @@ -18,6 +22,7 @@ class IPricePusher(ABC): client: PragmaClient consecutive_push_error: int + onchain_lock: asyncio.Lock @abstractmethod async def update_price_feeds(self, entries: List[Entry]) -> Optional[Dict]: ... @@ -27,35 +32,53 @@ class PricePusher(IPricePusher): def __init__(self, client: PragmaClient) -> None: self.client = client self.consecutive_push_error = 0 + self.onchain_lock = asyncio.Lock() @property def is_publishing_on_chain(self) -> bool: return isinstance(self.client, PragmaOnChainClient) + async def wait_for_publishing_acceptance(self, invocations: List[InvokeResult]): + """ + Waits for all publishing TX to be accepted on-chain. + """ + for invocation in invocations: + nonce = invocation.invoke_transaction.nonce + logger.info( + f"🏋️ PUSHER: ⏳ Waiting for TX {hex(invocation.hash)} (nonce={nonce}) to be accepted..." + ) + await invocation.wait_for_acceptance(check_interval=1) + async def update_price_feeds(self, entries: List[Entry]) -> Optional[Dict]: """ Push the entries passed as parameter with the internal pragma client. """ - logger.info(f"🏋️ PUSHER: 👷‍♂️ processing {len(entries)} new asset(s) to push...") try: - response = await self.client.publish_entries(entries) + logger.info(f"🏋️ PUSHER: 👷‍♂️ processing {len(entries)} new asset(s) to push...") + if self.is_publishing_on_chain: - last_invocation = response[-1] - logger.info( - f"🏋️ PUSHER: ⏳ waiting TX hash {hex(last_invocation.hash)} to be executed..." - ) - await last_invocation.wait_for_acceptance( - check_interval=1, retries=WAIT_FOR_ACCEPTANCE_MAX_RETRIES - ) - logger.info(f"🏋️ PUSHER: ✅ Successfully published {len(entries)} entrie(s)!") + async with self.onchain_lock: + start_t = time.time() + response = await self.client.publish_entries(entries) + await self.wait_for_publishing_acceptance(response) + else: + start_t = time.time() + response = await self.client.publish_entries(entries) + + end_t = time.time() + logger.info( + f"🏋️ PUSHER: ✅ Successfully published {len(entries)} entrie(s)! " + f"(took {(end_t - start_t):.2f}s)" + ) + self.consecutive_push_error = 0 return response + except Exception as e: logger.error(f"🏋️ PUSHER: ⛔ could not publish entrie(s): {e}") self.consecutive_push_error += 1 - if self.consecutive_push_error >= CONSECUTIVES_PUSH_ERRORS_LIMIT: - raise ValueError( - "⛔ Pusher tried to push for " - f"{self.consecutive_push_error} times and still failed. " - "Pusher does not seems to work? Stopping here." - ) - return None + + if self.consecutive_push_error >= CONSECUTIVES_PUSH_ERRORS_LIMIT: + raise ValueError( + f"⛔ Pusher failed {self.consecutive_push_error} times in a row. Stopping." + ) + return None diff --git a/price-pusher/price_pusher/orchestrator.py b/price-pusher/price_pusher/orchestrator.py index 309174d9..8ca55dab 100644 --- a/price-pusher/price_pusher/orchestrator.py +++ b/price-pusher/price_pusher/orchestrator.py @@ -99,17 +99,15 @@ async def _handle_listener(self, listener: PriceListener) -> None: """ while True: await listener.notification_event.wait() - assets_to_push = listener.price_config.get_all_assets() logger.info( f"💡 Notification received from LISTENER [{listener.id}] ! " "Sending entries into queue for: " - f"{assets_to_push}" + f"{listener.data_config}" ) - entries_to_push = self._flush_entries_for_assets(assets_to_push) + entries_to_push = self._flush_entries_for_assets(listener.data_config) if len(entries_to_push) > 0: await self.push_queue.put(entries_to_push) - # Wait for the task to be completed before clearing the notification - await self.push_queue.join() + await self.push_queue.join() # Wait for the entries to be processed listener.notification_event.clear() async def _pusher_service(self) -> None: diff --git a/price-pusher/tests/unit/test_orchestrator.py b/price-pusher/tests/unit/test_orchestrator.py index ebc3aaf3..ceba9c03 100644 --- a/price-pusher/tests/unit/test_orchestrator.py +++ b/price-pusher/tests/unit/test_orchestrator.py @@ -118,9 +118,6 @@ async def test_handle_listener(orchestrator, mock_listener, caplog): except asyncio.CancelledError: pass - # Assertions - mock_listener.price_config.get_all_assets.assert_called_once() - # Check logs assert any( "💡 Notification received from LISTENER" in record.message for record in caplog.records