Skip to content

Commit

Permalink
fix: 7d aggregate calculations
Browse files Browse the repository at this point in the history
  • Loading branch information
xadahiya committed Jul 26, 2023
1 parent 2ca86b4 commit 00aa45b
Showing 1 changed file with 12 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from ipfs_client.main import AsyncIPFSClient
from redis import asyncio as aioredis

from ..utils.helpers import get_pair_metadata
from ..utils.models.message_models import UniswapTradesAggregateSnapshot
from snapshotter.utils.callback_helpers import GenericProcessorAggregate
from snapshotter.utils.data_utils import get_project_epoch_snapshot
Expand Down Expand Up @@ -67,19 +66,13 @@ async def compute(

contract = project_id.split(':')[-2]

pair_metadata = await get_pair_metadata(
pair_address=contract,
redis_conn=redis,
rpc_helper=rpc_helper,
)
aggregate_snapshot = UniswapTradesAggregateSnapshot(
epochId=msg_obj.epochId,
)
# 24h snapshots fetches
snapshot_tasks = list()
self._logger.debug('fetching 24hour aggregates spaced out by 1 day over 7 days...')
# 1. find one day tail epoch
count = 0
count = 1
self._logger.debug(
'fetch # {}: queueing task for 24h aggregate snapshot for project ID {}'
' at currently received epoch ID {} with snasphot CID {}',
Expand All @@ -91,32 +84,23 @@ async def compute(
redis, msg_obj.snapshotCid, ipfs_reader, msg_obj.projectId,
),
)

seek_stop_flag = False
head_epoch = msg_obj.epochId
# 2. if not extrapolated, attempt to seek further back
while not seek_stop_flag or count < 7:
while not seek_stop_flag and count < 7:
tail_epoch_id, seek_stop_flag = await get_tail_epoch_id(
redis, protocol_state_contract, anchor_rpc_helper, head_epoch, 86400, msg_obj.projectId,
)
count += 1
if not seek_stop_flag or count > 1:
self._logger.debug(
'fetch # {}: for 7d aggregated trade volume calculations: '
'queueing task for 24h aggregate snapshot for project ID {} at rewinded epoch ID {}',
count, msg_obj.projectId, tail_epoch_id,
)
snapshot_tasks.append(
get_project_epoch_snapshot(
redis, protocol_state_contract, anchor_rpc_helper,
ipfs_reader, tail_epoch_id, msg_obj.projectId,
),
)
head_epoch = tail_epoch_id - 1
if count == 7:
self._logger.info(
'fetch # {}: reached 7 day limit for 24h aggregate snapshots for project ID {} at rewinded epoch ID {}',
count, msg_obj.projectId, tail_epoch_id,
snapshot_tasks.append(
get_project_epoch_snapshot(
redis, protocol_state_contract, anchor_rpc_helper,
ipfs_reader, tail_epoch_id, msg_obj.projectId,
),
)
head_epoch = tail_epoch_id - 1

all_snapshots = await asyncio.gather(*snapshot_tasks, return_exceptions=True)
self._logger.debug(
'for 7d aggregated trade volume calculations: fetched {} '
Expand All @@ -136,4 +120,6 @@ async def compute(

if not all(complete_flags) or count < 7:
aggregate_snapshot.complete = False
else:
aggregate_snapshot.complete = True
return aggregate_snapshot

0 comments on commit 00aa45b

Please sign in to comment.