diff --git a/snapshotter/core_api.py b/snapshotter/core_api.py index 9d49698b..7074608e 100644 --- a/snapshotter/core_api.py +++ b/snapshotter/core_api.py @@ -1,4 +1,5 @@ import json +import time from typing import List from fastapi import Depends @@ -9,6 +10,10 @@ from fastapi_pagination import add_pagination from fastapi_pagination import Page from fastapi_pagination import paginate +from httpx import AsyncClient +from httpx import AsyncHTTPTransport +from httpx import Limits +from httpx import Timeout from ipfs_client.main import AsyncIPFSClientSingleton from pydantic import Field from redis import asyncio as aioredis @@ -21,17 +26,22 @@ from snapshotter.auth.helpers.helpers import rate_limit_auth_check from snapshotter.auth.helpers.redis_conn import RedisPoolCache as AuthRedisPoolCache from snapshotter.settings.config import settings +from snapshotter.utils.callback_helpers import send_failure_notifications_async from snapshotter.utils.data_utils import get_project_epoch_snapshot from snapshotter.utils.data_utils import get_project_finalized_cid from snapshotter.utils.data_utils import get_project_time_series_data from snapshotter.utils.data_utils import get_snapshotter_project_status from snapshotter.utils.data_utils import get_snapshotter_status +from snapshotter.utils.data_utils import snapshotter_last_finalized_check from snapshotter.utils.default_logger import logger from snapshotter.utils.file_utils import read_json_file from snapshotter.utils.models.data_models import SnapshotterEpochProcessingReportItem +from snapshotter.utils.models.data_models import SnapshotterIssue +from snapshotter.utils.models.data_models import SnapshotterReportState from snapshotter.utils.models.data_models import SnapshotterStates from snapshotter.utils.models.data_models import SnapshotterStateUpdate from snapshotter.utils.models.data_models import TaskStatusRequest +from snapshotter.utils.models.data_models import UnfinalizedProject from snapshotter.utils.redis.rate_limiter import load_rate_limiter_scripts from snapshotter.utils.redis.redis_conn import RedisPoolCache from snapshotter.utils.redis.redis_keys import active_status_key @@ -39,9 +49,9 @@ from snapshotter.utils.redis.redis_keys import epoch_id_project_to_state_mapping from snapshotter.utils.redis.redis_keys import epoch_process_report_cached_key from snapshotter.utils.redis.redis_keys import project_last_finalized_epoch_key +from snapshotter.utils.redis.redis_keys import project_last_unfinalized_sent_key from snapshotter.utils.rpc import RpcHelper - REDIS_CONN_CONF = { 'host': settings.redis.host, 'port': settings.redis.port, @@ -52,6 +62,10 @@ # setup logging rest_logger = logger.bind(module='Powerloom|CoreAPI') +# Disables unnecessary logging for httpx requests +rest_logger.disable('httpcore._trace') +rest_logger.disable('httpx._client') + protocol_state_contract_abi = read_json_file( settings.protocol_state.abi, @@ -104,6 +118,19 @@ async def startup_boilerplate(): await app.state.ipfs_singleton.init_sessions() app.state.ipfs_reader_client = app.state.ipfs_singleton._ipfs_read_client app.state.epoch_size = 0 + app.state.last_unfinalized_check = 0 + + app.state.async_client = AsyncClient( + timeout=Timeout(timeout=5.0), + follow_redirects=False, + transport=AsyncHTTPTransport( + limits=Limits( + max_connections=200, + max_keepalive_connections=50, + keepalive_expiry=None, + ), + ), + ) # Health check endpoint @@ -132,6 +159,53 @@ async def health_check( 'status': 'error', 'message': 'Snapshotter is not active', } + + # check if there are any unfinalized projects every 10 minutes + if int(time.time()) - app.state.last_unfinalized_check >= 600: + app.state.last_unfinalized_check = int(time.time()) + unfinalized_projects: List[UnfinalizedProject] = await snapshotter_last_finalized_check( + redis_conn, + request.app.state.protocol_state_contract, + request.app.state.anchor_rpc_helper, + ) + + if unfinalized_projects: + for unfinalized_project in unfinalized_projects: + + last_known_unfinalized_epoch = await redis_conn.get( + project_last_unfinalized_sent_key(unfinalized_project.projectId), + ) + + # Check if project's last unfinalized epoch has been reported + if ( + last_known_unfinalized_epoch and + int(last_known_unfinalized_epoch) == unfinalized_project.lastFinalizedEpochId + ): + continue + + notification_message = SnapshotterIssue( + instanceID=settings.instance_id, + issueType=SnapshotterReportState.UNFINALIZED_PROJECT.value, + projectID=unfinalized_project.projectId, + epochId=str(unfinalized_project.currentEpochId), + timeOfReporting=str(time.time()), + extra=json.dumps({'issueDetails': 'Error : project has been unfinalized for > 50 epochs'}), + ) + await send_failure_notifications_async( + client=app.state.async_client, message=notification_message, + ) + + await redis_conn.set( + project_last_unfinalized_sent_key(unfinalized_project.projectId), + unfinalized_project.lastFinalizedEpochId, + ) + + response.status_code = 503 + return { + 'status': 'error', + 'message': 'Snapshotter has unfinalized projects', + } + return {'status': 'OK'} diff --git a/snapshotter/tests/last_finalized_check_test.py b/snapshotter/tests/last_finalized_check_test.py new file mode 100644 index 00000000..2b81d5c4 --- /dev/null +++ b/snapshotter/tests/last_finalized_check_test.py @@ -0,0 +1,68 @@ +import asyncio +from web3 import Web3 + +from snapshotter.settings.config import settings +from snapshotter.utils.default_logger import logger as rest_logger +from snapshotter.utils.file_utils import read_json_file +from snapshotter.utils.redis.redis_conn import RedisPoolCache +from snapshotter.utils.rpc import RpcHelper + +from snapshotter.settings.config import aggregator_config +from snapshotter.settings.config import projects_config +from snapshotter.utils.models.settings_model import AggregateOn +from snapshotter.utils.helper_functions import gen_multiple_type_project_id +from snapshotter.utils.helper_functions import gen_single_type_project_id +from snapshotter.utils.redis.redis_keys import stored_projects_key + +from snapshotter.utils.data_utils import snapshotter_last_finalized_check + + +async def test_last_finalized_check(): + + anchor_rpc_helper = RpcHelper(rpc_settings=settings.anchor_chain_rpc) + aioredis_pool = RedisPoolCache() + await aioredis_pool.populate() + redis_conn = aioredis_pool._aioredis_pool + protocol_state_contract_abi = read_json_file( + settings.protocol_state.abi, + rest_logger, + ) + protocol_state_contract_address = settings.protocol_state.address + protocol_state_contract = anchor_rpc_helper.get_current_node()['web3_client'].eth.contract( + address=Web3.toChecksumAddress( + protocol_state_contract_address, + ), + abi=protocol_state_contract_abi, + ) + + all_projects = [] + + for project_config in projects_config: + all_projects += [f'{project_config.project_type}:{project.lower()}:{settings.namespace}' for project in project_config.projects] + + for config in aggregator_config: + if config.aggregate_on == AggregateOn.single_project: + project_ids = filter(lambda x: config.filters.projectId in x, all_projects) + all_projects += [ + gen_single_type_project_id(config.project_type, project_id) + for project_id in project_ids + ] + if config.aggregate_on == AggregateOn.multi_project: + all_projects.append(gen_multiple_type_project_id(config.project_type, config.projects_to_wait_for)) + + await redis_conn.sadd( + stored_projects_key(), + *all_projects, + ) + + unfinalized_projects = await snapshotter_last_finalized_check( + redis_conn, + protocol_state_contract, + anchor_rpc_helper, + ) + + print(unfinalized_projects) + +if __name__ == '__main__': + loop = asyncio.get_event_loop() + loop.run_until_complete(test_last_finalized_check()) \ No newline at end of file diff --git a/snapshotter/utils/data_utils.py b/snapshotter/utils/data_utils.py index 89b34fe0..332021c8 100644 --- a/snapshotter/utils/data_utils.py +++ b/snapshotter/utils/data_utils.py @@ -1,6 +1,7 @@ import asyncio import json import os +from random import sample from typing import List import tenacity @@ -22,6 +23,7 @@ from snapshotter.utils.models.data_models import SnapshotterReportState from snapshotter.utils.models.data_models import SnapshotterStatus from snapshotter.utils.models.data_models import SnapshotterStatusReport +from snapshotter.utils.models.data_models import UnfinalizedProject from snapshotter.utils.redis.redis_keys import project_finalized_data_zset from snapshotter.utils.redis.redis_keys import project_first_epoch_hmap from snapshotter.utils.redis.redis_keys import project_snapshotter_status_report_key @@ -837,3 +839,45 @@ async def get_project_time_series_data( ipfs_reader=ipfs_reader, project_ids=project_ids, ) + + +async def snapshotter_last_finalized_check( + redis_conn: aioredis.Redis, + state_contract_obj, + rpc_helper, +): + try: + # get all projects from redis + all_projects = await redis_conn.smembers(stored_projects_key()) + all_projects = [project_id.decode('utf-8') for project_id in all_projects] + + # take random sample of 5 projects + samples = sample(all_projects, 5) + + # get current epoch and the last finalized snapshot for each sampled project + tasks = [state_contract_obj.functions.currentEpoch()] + tasks += [ + state_contract_obj.functions.lastFinalizedSnapshot(project_id) + for project_id in samples + ] + + [current_epoch, *last_finalized_snapshots] = await rpc_helper.web3_call(tasks, redis_conn=redis_conn) + + current_epoch_id = current_epoch[2] + + # check if the last finalized snapshot is older than 50 epochs + unfinalized_projects = [] + for project_id, last_finalized_snapshot in zip(samples, last_finalized_snapshots): + if current_epoch_id - last_finalized_snapshot > 50: + unfinalized_project = UnfinalizedProject( + projectId=project_id, + currentEpochId=current_epoch_id, + lastFinalizedEpochId=last_finalized_snapshot, + ) + unfinalized_projects.append(unfinalized_project) + + return unfinalized_projects + + except Exception as e: + logger.warning('Error while checking for unfinalized snapshots', error=e) + return [] diff --git a/snapshotter/utils/models/data_models.py b/snapshotter/utils/models/data_models.py index cacd4275..166765ee 100644 --- a/snapshotter/utils/models/data_models.py +++ b/snapshotter/utils/models/data_models.py @@ -33,6 +33,7 @@ class SnapshotterReportState(Enum): CRASHED_REPORTER_THREAD = 'CRASHED_REPORTER_THREAD' UNHEALTHY_EPOCH_PROCESSING = 'UNHEALTHY_EPOCH_PROCESSING' ONLY_FINALIZED_SNAPSHOT_RECIEVED = 'ONLY_FINALIZED_SNAPSHOT_RECIEVED' + UNFINALIZED_PROJECT = 'UNFINALIZED_PROJECT' class SnapshotterStates(Enum): @@ -198,6 +199,12 @@ class TaskStatusRequest(BaseModel): wallet_address: str +class UnfinalizedProject(BaseModel): + projectId: str + currentEpochId: int + lastFinalizedEpochId: int + + class GenericTxnIssue(BaseModel): accountAddress: str issueType: str diff --git a/snapshotter/utils/redis/redis_keys.py b/snapshotter/utils/redis/redis_keys.py index 203fa216..d96a78e9 100644 --- a/snapshotter/utils/redis/redis_keys.py +++ b/snapshotter/utils/redis/redis_keys.py @@ -111,9 +111,11 @@ def last_snapshot_processing_complete_timestamp_key(): def last_epoch_detected_timestamp_key(): return f'lastEpochDetectedTimestamp:{settings.namespace}' + def last_epoch_detected_epoch_id_key(): return f'lastEpochDetectedEpochID:{settings.namespace}' + def submitted_base_snapshots_key(epoch_id, project_id): return f'submittedBaseSnapshots:{epoch_id}:{project_id}' @@ -124,3 +126,7 @@ def submitted_unfinalized_snapshot_cids(project_id): def process_hub_core_start_timestamp(): return f'processHubCoreStartTimestamp:{settings.namespace}' + + +def project_last_unfinalized_sent_key(project_id): + return f'projectID:{project_id}:lastFinalizedEpoch'