Skip to content

Commit

Permalink
fix: Price pusher concurrency issue onchain (#180)
Browse files Browse the repository at this point in the history
* fix(price_pusher_concurrency): Concurrency stuff

* fix(price_pusher_concurrency): Onchain concurrency issue

* fix(price_pusher_concurrency): Logging stuff

* fix(price_pusher_concurrency): Fixed units

* feat: update price pusher configs

---------

Co-authored-by: 0xevolve <[email protected]>
  • Loading branch information
akhercha and EvolveArt authored Jul 31, 2024
1 parent c5e479f commit 30eb08e
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 28 deletions.
7 changes: 6 additions & 1 deletion infra/price-pusher/config/config.mainnet.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,16 @@
- USDC/USD
- USDT/USD
- DAI/USD
time_difference: 120
price_deviation: 0.0025

- pairs:
spot:
- LORDS/USD
- ZEND/USD
- NSTR/USD
time_difference: 120
price_deviation: 0.0025
price_deviation: 0.005

- pairs:
future:
Expand Down
7 changes: 6 additions & 1 deletion infra/price-pusher/config/config.sepolia.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,16 @@
- USDC/USD
- USDT/USD
- DAI/USD
time_difference: 120
price_deviation: 0.0025

- pairs:
spot:
- LORDS/USD
- ZEND/USD
- NSTR/USD
time_difference: 120
price_deviation: 0.0025
price_deviation: 0.005

- pairs:
future:
Expand Down
2 changes: 1 addition & 1 deletion pragma-sdk/pragma_sdk/onchain/mixins/nonce.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ async def get_nonce(
block_number=block_number,
)

return int(nonce)
return nonce # type: ignore[no-any-return]

async def get_status(
self,
Expand Down
57 changes: 40 additions & 17 deletions price-pusher/price_pusher/core/pusher.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import asyncio
import time

from abc import ABC, abstractmethod

from typing import List, Optional, Dict
from starknet_py.contract import InvokeResult

from pragma_sdk.common.types.client import PragmaClient
from pragma_sdk.common.types.entry import Entry
Expand All @@ -18,6 +22,7 @@
class IPricePusher(ABC):
client: PragmaClient
consecutive_push_error: int
onchain_lock: asyncio.Lock

@abstractmethod
async def update_price_feeds(self, entries: List[Entry]) -> Optional[Dict]: ...
Expand All @@ -27,35 +32,53 @@ class PricePusher(IPricePusher):
def __init__(self, client: PragmaClient) -> None:
self.client = client
self.consecutive_push_error = 0
self.onchain_lock = asyncio.Lock()

@property
def is_publishing_on_chain(self) -> bool:
return isinstance(self.client, PragmaOnChainClient)

async def wait_for_publishing_acceptance(self, invocations: List[InvokeResult]):
"""
Waits for all publishing TX to be accepted on-chain.
"""
for invocation in invocations:
nonce = invocation.invoke_transaction.nonce
logger.info(
f"🏋️ PUSHER: ⏳ Waiting for TX {hex(invocation.hash)} (nonce={nonce}) to be accepted..."
)
await invocation.wait_for_acceptance(check_interval=1)

async def update_price_feeds(self, entries: List[Entry]) -> Optional[Dict]:
"""
Push the entries passed as parameter with the internal pragma client.
"""
logger.info(f"🏋️ PUSHER: 👷‍♂️ processing {len(entries)} new asset(s) to push...")
try:
response = await self.client.publish_entries(entries)
logger.info(f"🏋️ PUSHER: 👷‍♂️ processing {len(entries)} new asset(s) to push...")

if self.is_publishing_on_chain:
last_invocation = response[-1]
logger.info(
f"🏋️ PUSHER: ⏳ waiting TX hash {hex(last_invocation.hash)} to be executed..."
)
await last_invocation.wait_for_acceptance(
check_interval=1, retries=WAIT_FOR_ACCEPTANCE_MAX_RETRIES
)
logger.info(f"🏋️ PUSHER: ✅ Successfully published {len(entries)} entrie(s)!")
async with self.onchain_lock:
start_t = time.time()
response = await self.client.publish_entries(entries)
await self.wait_for_publishing_acceptance(response)
else:
start_t = time.time()
response = await self.client.publish_entries(entries)

end_t = time.time()
logger.info(
f"🏋️ PUSHER: ✅ Successfully published {len(entries)} entrie(s)! "
f"(took {(end_t - start_t):.2f}s)"
)
self.consecutive_push_error = 0
return response

except Exception as e:
logger.error(f"🏋️ PUSHER: ⛔ could not publish entrie(s): {e}")
self.consecutive_push_error += 1
if self.consecutive_push_error >= CONSECUTIVES_PUSH_ERRORS_LIMIT:
raise ValueError(
"⛔ Pusher tried to push for "
f"{self.consecutive_push_error} times and still failed. "
"Pusher does not seems to work? Stopping here."
)
return None

if self.consecutive_push_error >= CONSECUTIVES_PUSH_ERRORS_LIMIT:
raise ValueError(
f"⛔ Pusher failed {self.consecutive_push_error} times in a row. Stopping."
)
return None
8 changes: 3 additions & 5 deletions price-pusher/price_pusher/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,17 +99,15 @@ async def _handle_listener(self, listener: PriceListener) -> None:
"""
while True:
await listener.notification_event.wait()
assets_to_push = listener.price_config.get_all_assets()
logger.info(
f"💡 Notification received from LISTENER [{listener.id}] ! "
"Sending entries into queue for: "
f"{assets_to_push}"
f"{listener.data_config}"
)
entries_to_push = self._flush_entries_for_assets(assets_to_push)
entries_to_push = self._flush_entries_for_assets(listener.data_config)
if len(entries_to_push) > 0:
await self.push_queue.put(entries_to_push)
# Wait for the task to be completed before clearing the notification
await self.push_queue.join()
await self.push_queue.join() # Wait for the entries to be processed
listener.notification_event.clear()

async def _pusher_service(self) -> None:
Expand Down
3 changes: 0 additions & 3 deletions price-pusher/tests/unit/test_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,6 @@ async def test_handle_listener(orchestrator, mock_listener, caplog):
except asyncio.CancelledError:
pass

# Assertions
mock_listener.price_config.get_all_assets.assert_called_once()

# Check logs
assert any(
"💡 Notification received from LISTENER" in record.message for record in caplog.records
Expand Down

0 comments on commit 30eb08e

Please sign in to comment.