Skip to content

Commit

Permalink
initial commit: update for cache integration
Browse files Browse the repository at this point in the history
  • Loading branch information
getjiggy committed Feb 9, 2024
1 parent f4092c1 commit 9ccf0fd
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 145 deletions.
30 changes: 13 additions & 17 deletions snapshotter/system_event_detector.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,18 @@
from snapshotter.utils.file_utils import read_json_file
from snapshotter.utils.models.data_models import EpochReleasedEvent
from snapshotter.utils.models.data_models import EventBase
from snapshotter.utils.models.data_models import ProjectsUpdatedEvent
from snapshotter.utils.models.data_models import SnapshotFinalizedEvent
from snapshotter.utils.models.data_models import SnapshottersUpdatedEvent
from snapshotter.utils.rabbitmq_helpers import RabbitmqThreadedSelectLoopInteractor
from snapshotter.utils.redis.redis_conn import RedisPoolCache
from snapshotter.utils.redis.redis_keys import event_detector_last_processed_block
from snapshotter.utils.redis.redis_keys import last_epoch_detected_epoch_id_key
from snapshotter.utils.redis.redis_keys import last_epoch_detected_timestamp_key
from snapshotter.utils.rpc import get_event_sig_and_abi
from snapshotter.utils.rpc import RpcHelper



def rabbitmq_and_redis_cleanup(fn):
"""
A decorator function that wraps the given function and handles cleanup of RabbitMQ and Redis connections in case of
Expand Down Expand Up @@ -110,14 +111,14 @@ def __init__(self, name, **kwargs):
self._rabbitmq_queue = queue.Queue()
self._shutdown_initiated = False
self._logger = logger.bind(
module=f'{name}|{settings.namespace}-{settings.instance_id[:5]}',
module=name,
)

self._exchange = (
f'{settings.rabbitmq.setup.event_detector.exchange}:{settings.namespace}'
)
self._routing_key_prefix = (
f'powerloom-event-detector:{settings.namespace}:{settings.instance_id}.'
f'event-detector:{settings.namespace}:{settings.instance_id}.'
)
self._aioredis_pool = None
self._redis_conn = None
Expand All @@ -131,7 +132,7 @@ def __init__(self, name, **kwargs):
)
self.contract_address = settings.protocol_state.address
self.contract = self.rpc_helper.get_current_node()['web3_client'].eth.contract(
address=Web3.toChecksumAddress(
address=Web3.to_checksum_address(
self.contract_address,
),
abi=self.contract_abi,
Expand All @@ -140,19 +141,16 @@ def __init__(self, name, **kwargs):
# event EpochReleased(uint256 indexed epochId, uint256 begin, uint256 end, uint256 timestamp);
# event SnapshotFinalized(uint256 indexed epochId, uint256 epochEnd, string projectId,
# string snapshotCid, uint256 timestamp);
# event ProjectsUpdated(string projectId, bool allowed);

EVENTS_ABI = {
'EpochReleased': self.contract.events.EpochReleased._get_event_abi(),
'SnapshotFinalized': self.contract.events.SnapshotFinalized._get_event_abi(),
'ProjectsUpdated': self.contract.events.ProjectsUpdated._get_event_abi(),
'allSnapshottersUpdated': self.contract.events.allSnapshottersUpdated._get_event_abi(),
}

EVENT_SIGS = {
'EpochReleased': 'EpochReleased(uint256,uint256,uint256,uint256)',
'SnapshotFinalized': 'SnapshotFinalized(uint256,uint256,string,string,uint256)',
'ProjectsUpdated': 'ProjectsUpdated(string,bool,uint256)',
'allSnapshottersUpdated': 'allSnapshottersUpdated(address,bool)',

}
Expand Down Expand Up @@ -197,6 +195,7 @@ async def get_events(self, from_block: int, to_block: int):

events = []
new_epoch_detected = False
latest_epoch_id = - 1
for log in events_log:
if log.event == 'EpochReleased':
event = EpochReleasedEvent(
Expand All @@ -206,6 +205,7 @@ async def get_events(self, from_block: int, to_block: int):
timestamp=log.args.timestamp,
)
new_epoch_detected = True
latest_epoch_id = max(latest_epoch_id, log.args.epochId)
events.append((log.event, event))

elif log.event == 'SnapshotFinalized':
Expand All @@ -217,14 +217,6 @@ async def get_events(self, from_block: int, to_block: int):
timestamp=log.args.timestamp,
)
events.append((log.event, event))
elif log.event == 'ProjectsUpdated':
event = ProjectsUpdatedEvent(
projectId=log.args.projectId,
allowed=log.args.allowed,
enableEpochId=log.args.enableEpochId,
timestamp=int(time.time()),
)
events.append((log.event, event))
elif log.event == 'allSnapshottersUpdated':
event = SnapshottersUpdatedEvent(
snapshotterAddress=log.args.snapshotterAddress,
Expand All @@ -238,6 +230,10 @@ async def get_events(self, from_block: int, to_block: int):
last_epoch_detected_timestamp_key(),
int(time.time()),
)
await self._redis_conn.set(
last_epoch_detected_epoch_id_key(),
latest_epoch_id,
)

self._logger.info('Events: {}', events)
return events
Expand Down Expand Up @@ -298,7 +294,7 @@ async def _detect_events(self):
"""
while True:
try:
current_block = await self.rpc_helper.get_current_block(redis_conn=self._redis_conn)
current_block = await self.rpc_helper.get_current_block_number()
self._logger.info('Current block: {}', current_block)

except Exception as e:
Expand Down Expand Up @@ -421,4 +417,4 @@ def run(self):

self.ev_loop.run_until_complete(
self._detect_events(),
)
)
79 changes: 57 additions & 22 deletions snapshotter/utils/data_utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import asyncio
import json
import os
from typing import List

import tenacity
Expand All @@ -11,10 +10,7 @@
from tenacity import stop_after_attempt
from tenacity import wait_random_exponential

from snapshotter.settings.config import settings
from snapshotter.utils.default_logger import logger
from snapshotter.utils.file_utils import read_json_file
from snapshotter.utils.file_utils import write_json_file
from snapshotter.utils.models.data_models import ProjectStatus
from snapshotter.utils.models.data_models import SnapshotterIncorrectSnapshotSubmission
from snapshotter.utils.models.data_models import SnapshotterMissedSnapshotSubmission
Expand Down Expand Up @@ -135,6 +131,54 @@ async def w3_get_and_cache_finalized_cid(
return f'null_{epoch_id}', epoch_id


async def get_project_last_finalized_cid_and_epoch(redis_conn: aioredis.Redis, state_contract_obj, rpc_helper, project_id):
"""
Get the last epoch for a given project ID.
Args:
redis_conn (aioredis.Redis): Redis connection object.
state_contract_obj: Contract object for the state contract.
rpc_helper: RPC helper object.
project_id (str): ID of the project.
Returns:
int: The last epoch for the given project ID.
"""
project_last_finalized = await redis_conn.zrevrangebyscore(
project_finalized_data_zset(project_id),
max='+inf',
min='-inf',
withscores=True,
start=0,
num=1,
)

if project_last_finalized:
project_last_finalized_cid, project_last_finalized_epoch = project_last_finalized[0]
project_last_finalized_epoch = int(project_last_finalized_epoch)
project_last_finalized_cid = project_last_finalized_cid.decode('utf-8')

if project_last_finalized_cid and 'null' not in project_last_finalized_cid:
return project_last_finalized_cid, project_last_finalized_epoch

tasks = [
state_contract_obj.functions.lastFinalizedSnapshot(project_id),
]

[last_finalized_epoch] = await rpc_helper.web3_call(tasks, redis_conn=redis_conn)
logger.info(f'last finalized epoch for project {project_id} is {last_finalized_epoch}')

# getting finalized cid for last finalized epoch
last_finalized_cid = await get_project_finalized_cid(
redis_conn, state_contract_obj, rpc_helper, last_finalized_epoch, project_id,
)

if last_finalized_cid and 'null' not in last_finalized_cid:
return last_finalized_cid, int(last_finalized_epoch)
else:
return '', 0


# TODO: warmup cache to reduce RPC calls overhead
async def get_project_first_epoch(redis_conn: aioredis.Redis, state_contract_obj, rpc_helper, project_id):
"""
Expand Down Expand Up @@ -216,23 +260,14 @@ async def get_submission_data(redis_conn: aioredis.Redis, cid, ipfs_reader, proj
if 'null' in cid:
return dict()

cached_data_path = os.path.join(settings.ipfs.local_cache_path, project_id, 'snapshots')
filename = f'{cid}.json'
logger.info('Project {} CID {}, fetching data from IPFS', project_id, cid)
try:
submission_data = read_json_file(os.path.join(cached_data_path, filename))
except Exception as e:
# Fetch from IPFS
logger.trace('Error while reading from cache', error=e)
logger.info('Project {} CID {}, fetching data from IPFS', project_id, cid)
try:
submission_data = await fetch_file_from_ipfs(ipfs_reader, cid)
except:
logger.error('Error while fetching data from IPFS | Project {} | CID {}', project_id, cid)
submission_data = dict()
else:
# Cache it
write_json_file(cached_data_path, filename, submission_data)
submission_data = json.loads(submission_data)
submission_data = await fetch_file_from_ipfs(ipfs_reader, cid)
except:
logger.error('Error while fetching data from IPFS | Project {} | CID {}', project_id, cid)
submission_data = dict()
else:
submission_data = json.loads(submission_data)
return submission_data


Expand Down Expand Up @@ -352,7 +387,7 @@ async def build_projects_list_from_events(redis_conn: aioredis.Redis, state_cont
redis_conn=redis_conn,
)

current_block = await rpc_helper.get_current_block_number(redis_conn)
current_block = await rpc_helper.get_current_block_number()
event_sig, event_abi = get_event_sig_and_abi(EVENT_SIGS, EVENT_ABI)

# from start_block to current block, get all events in batches of 1000, 10 requests parallelly
Expand Down Expand Up @@ -726,4 +761,4 @@ async def get_snapshotter_project_status(redis_conn: aioredis.Redis, project_id:
),
)

return project_status
return project_status
Loading

0 comments on commit 9ccf0fd

Please sign in to comment.