From 6bcdc3a7a72e7893f47d5991b098e136980bd559 Mon Sep 17 00:00:00 2001 From: Seth-Schmidt Date: Tue, 9 Apr 2024 23:13:07 -0400 Subject: [PATCH 1/7] add: data util for checking project finalization --- snapshotter/utils/data_utils.py | 43 +++++++++++++++++++++++++ snapshotter/utils/models/data_models.py | 6 ++++ 2 files changed, 49 insertions(+) diff --git a/snapshotter/utils/data_utils.py b/snapshotter/utils/data_utils.py index 7f717dd..47dab8b 100644 --- a/snapshotter/utils/data_utils.py +++ b/snapshotter/utils/data_utils.py @@ -2,6 +2,7 @@ import json import os from typing import List +from random import sample import tenacity from ipfs_client.dag import IPFSAsyncClientError @@ -22,6 +23,7 @@ from snapshotter.utils.models.data_models import SnapshotterReportState from snapshotter.utils.models.data_models import SnapshotterStatus from snapshotter.utils.models.data_models import SnapshotterStatusReport +from snapshotter.utils.models.data_models import UnfinalizedProject from snapshotter.utils.redis.redis_keys import project_finalized_data_zset from snapshotter.utils.redis.redis_keys import project_first_epoch_hmap from snapshotter.utils.redis.redis_keys import project_snapshotter_status_report_key @@ -831,3 +833,44 @@ async def get_project_time_series_data( ipfs_reader=ipfs_reader, project_ids=project_ids, ) + +async def snapshotter_last_finalized_check( + redis_conn: aioredis.Redis, + state_contract_obj, + rpc_helper, +): + try: + # get all projects from redis + all_projects = await redis_conn.smembers(stored_projects_key()) + all_projects = [project_id.decode('utf-8') for project_id in all_projects] + + # take random sample of 5 projects + samples = sample(all_projects, 5) + + # get current epoch and the last finalized snapshot for each sampled project + tasks = [state_contract_obj.functions.currentEpoch()] + tasks += [ + state_contract_obj.functions.lastFinalizedSnapshot(project_id) + for project_id in samples + ] + + [current_epoch, *last_finalized_snapshots] = await rpc_helper.web3_call(tasks, redis_conn=redis_conn) + + current_epoch_id = current_epoch[2] + + # check if the last finalized snapshot is older than 50 epochs + unfinalized_projects = [] + for project_id, last_finalized_snapshot in zip(samples, last_finalized_snapshots): + if current_epoch_id - last_finalized_snapshot > 50: + unfinalized_project = UnfinalizedProject( + projectId=project_id, + currentEpochId=current_epoch_id, + lastFinalizedEpochId=last_finalized_snapshot, + ) + unfinalized_projects.append(unfinalized_project) + + return unfinalized_projects + + except Exception as e: + logger.warning('Error while checking for unfinalized snapshots', error=e) + return [] \ No newline at end of file diff --git a/snapshotter/utils/models/data_models.py b/snapshotter/utils/models/data_models.py index 2a326f0..df85b54 100644 --- a/snapshotter/utils/models/data_models.py +++ b/snapshotter/utils/models/data_models.py @@ -32,6 +32,7 @@ class SnapshotterReportState(Enum): CRASHED_REPORTER_THREAD = 'CRASHED_REPORTER_THREAD' UNHEALTHY_EPOCH_PROCESSING = 'UNHEALTHY_EPOCH_PROCESSING' ONLY_FINALIZED_SNAPSHOT_RECIEVED = 'ONLY_FINALIZED_SNAPSHOT_RECIEVED' + UNFINALIZED_PROJECT = 'UNFINALIZED_PROJECT' class SnapshotterStates(Enum): @@ -195,3 +196,8 @@ class UnfinalizedSnapshot(BaseModel): class TaskStatusRequest(BaseModel): task_type: str wallet_address: str + +class UnfinalizedProject(BaseModel): + projectId: str + currentEpochId: int + lastFinalizedEpochId: int From 02934574f814bde608bc7c1ddac6850c9297171a Mon Sep 17 00:00:00 2001 From: Seth-Schmidt Date: Tue, 9 Apr 2024 23:14:03 -0400 Subject: [PATCH 2/7] add: test for project finalization check --- .../tests/last_finalized_check_test.py | 68 +++++++++++++++++++ 1 file changed, 68 insertions(+) create mode 100644 snapshotter/tests/last_finalized_check_test.py diff --git a/snapshotter/tests/last_finalized_check_test.py b/snapshotter/tests/last_finalized_check_test.py new file mode 100644 index 0000000..2b81d5c --- /dev/null +++ b/snapshotter/tests/last_finalized_check_test.py @@ -0,0 +1,68 @@ +import asyncio +from web3 import Web3 + +from snapshotter.settings.config import settings +from snapshotter.utils.default_logger import logger as rest_logger +from snapshotter.utils.file_utils import read_json_file +from snapshotter.utils.redis.redis_conn import RedisPoolCache +from snapshotter.utils.rpc import RpcHelper + +from snapshotter.settings.config import aggregator_config +from snapshotter.settings.config import projects_config +from snapshotter.utils.models.settings_model import AggregateOn +from snapshotter.utils.helper_functions import gen_multiple_type_project_id +from snapshotter.utils.helper_functions import gen_single_type_project_id +from snapshotter.utils.redis.redis_keys import stored_projects_key + +from snapshotter.utils.data_utils import snapshotter_last_finalized_check + + +async def test_last_finalized_check(): + + anchor_rpc_helper = RpcHelper(rpc_settings=settings.anchor_chain_rpc) + aioredis_pool = RedisPoolCache() + await aioredis_pool.populate() + redis_conn = aioredis_pool._aioredis_pool + protocol_state_contract_abi = read_json_file( + settings.protocol_state.abi, + rest_logger, + ) + protocol_state_contract_address = settings.protocol_state.address + protocol_state_contract = anchor_rpc_helper.get_current_node()['web3_client'].eth.contract( + address=Web3.toChecksumAddress( + protocol_state_contract_address, + ), + abi=protocol_state_contract_abi, + ) + + all_projects = [] + + for project_config in projects_config: + all_projects += [f'{project_config.project_type}:{project.lower()}:{settings.namespace}' for project in project_config.projects] + + for config in aggregator_config: + if config.aggregate_on == AggregateOn.single_project: + project_ids = filter(lambda x: config.filters.projectId in x, all_projects) + all_projects += [ + gen_single_type_project_id(config.project_type, project_id) + for project_id in project_ids + ] + if config.aggregate_on == AggregateOn.multi_project: + all_projects.append(gen_multiple_type_project_id(config.project_type, config.projects_to_wait_for)) + + await redis_conn.sadd( + stored_projects_key(), + *all_projects, + ) + + unfinalized_projects = await snapshotter_last_finalized_check( + redis_conn, + protocol_state_contract, + anchor_rpc_helper, + ) + + print(unfinalized_projects) + +if __name__ == '__main__': + loop = asyncio.get_event_loop() + loop.run_until_complete(test_last_finalized_check()) \ No newline at end of file From 90a0fe0d00c13b57f6f8d8b280594efb4b7e2db5 Mon Sep 17 00:00:00 2001 From: Seth-Schmidt Date: Tue, 9 Apr 2024 23:14:27 -0400 Subject: [PATCH 3/7] update: core-api health endpoint to check project finalization --- snapshotter/core_api.py | 53 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 53 insertions(+) diff --git a/snapshotter/core_api.py b/snapshotter/core_api.py index c6a7ad2..5053cf1 100644 --- a/snapshotter/core_api.py +++ b/snapshotter/core_api.py @@ -1,4 +1,5 @@ import json +import time from typing import List from fastapi import Depends @@ -9,6 +10,10 @@ from fastapi_pagination import add_pagination from fastapi_pagination import Page from fastapi_pagination import paginate +from httpx import AsyncClient +from httpx import AsyncHTTPTransport +from httpx import Limits +from httpx import Timeout from ipfs_client.main import AsyncIPFSClientSingleton from pydantic import Field from redis import asyncio as aioredis @@ -21,17 +26,22 @@ from snapshotter.auth.helpers.helpers import rate_limit_auth_check from snapshotter.auth.helpers.redis_conn import RedisPoolCache as AuthRedisPoolCache from snapshotter.settings.config import settings +from snapshotter.utils.callback_helpers import send_failure_notifications_async from snapshotter.utils.data_utils import get_project_epoch_snapshot from snapshotter.utils.data_utils import get_project_finalized_cid from snapshotter.utils.data_utils import get_project_time_series_data from snapshotter.utils.data_utils import get_snapshotter_project_status from snapshotter.utils.data_utils import get_snapshotter_status +from snapshotter.utils.data_utils import snapshotter_last_finalized_check from snapshotter.utils.default_logger import logger from snapshotter.utils.file_utils import read_json_file from snapshotter.utils.models.data_models import SnapshotterEpochProcessingReportItem from snapshotter.utils.models.data_models import SnapshotterStates from snapshotter.utils.models.data_models import SnapshotterStateUpdate +from snapshotter.utils.models.data_models import SnapshotterIssue +from snapshotter.utils.models.data_models import SnapshotterReportState from snapshotter.utils.models.data_models import TaskStatusRequest +from snapshotter.utils.models.data_models import UnfinalizedProject from snapshotter.utils.redis.rate_limiter import load_rate_limiter_scripts from snapshotter.utils.redis.redis_conn import RedisPoolCache from snapshotter.utils.redis.redis_keys import active_status_key @@ -103,6 +113,19 @@ async def startup_boilerplate(): await app.state.ipfs_singleton.init_sessions() app.state.ipfs_reader_client = app.state.ipfs_singleton._ipfs_read_client app.state.epoch_size = 0 + app.state.last_unfinalized_check = 0 + + app.state.async_client = AsyncClient( + timeout=Timeout(timeout=5.0), + follow_redirects=False, + transport=AsyncHTTPTransport( + limits=Limits( + max_connections=200, + max_keepalive_connections=50, + keepalive_expiry=None, + ), + ), + ) # Health check endpoint @@ -131,6 +154,36 @@ async def health_check( 'status': 'error', 'message': 'Snapshotter is not active', } + + # check if there are any unfinalized projects every 10 minutes + if int(time.time()) - app.state.last_unfinalized_check >= 600: + app.state.last_unfinalized_check = int(time.time()) + unfinalized_projects: List[UnfinalizedProject] = await snapshotter_last_finalized_check( + redis_conn, + request.app.state.protocol_state_contract, + request.app.state.anchor_rpc_helper, + ) + + if unfinalized_projects: + for unfinalized_project in unfinalized_projects: + notification_message = SnapshotterIssue( + instanceID=settings.instance_id, + issueType=SnapshotterReportState.UNFINALIZED_PROJECT.value, + projectID=unfinalized_project.projectId, + epochId=str(unfinalized_project.currentEpochId), + timeOfReporting=str(time.time()), + extra=json.dumps({'issueDetails': 'Error : project has been unfinalized for > 50 epochs'}), + ) + await send_failure_notifications_async( + client=app.state.async_client, message=notification_message, + ) + + response.status_code = 503 + return { + 'status': 'error', + 'message': 'Snapshotter has unfinalized projects', + } + return {'status': 'OK'} From ebdcd5eb2265075b311c4b676df4f1aa3c32e0ca Mon Sep 17 00:00:00 2001 From: Seth-Schmidt Date: Wed, 10 Apr 2024 18:46:49 -0400 Subject: [PATCH 4/7] chore: disable excessive httpx logging --- snapshotter/core_api.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/snapshotter/core_api.py b/snapshotter/core_api.py index 5053cf1..10738c9 100644 --- a/snapshotter/core_api.py +++ b/snapshotter/core_api.py @@ -62,6 +62,10 @@ # setup logging rest_logger = logger.bind(module='Powerloom|CoreAPI') +# Disables unnecessary logging for httpx requests +rest_logger.disable('httpcore._trace') +rest_logger.disable('httpx._client') + protocol_state_contract_abi = read_json_file( settings.protocol_state.abi, From 8e5754aff5c491ed6e9e4c011a2d8793a9d24807 Mon Sep 17 00:00:00 2001 From: Seth-Schmidt Date: Wed, 10 Apr 2024 18:59:04 -0400 Subject: [PATCH 5/7] fix: pre-commit formatting --- snapshotter/core_api.py | 7 +++---- snapshotter/utils/data_utils.py | 13 +++++++------ snapshotter/utils/models/data_models.py | 1 + 3 files changed, 11 insertions(+), 10 deletions(-) diff --git a/snapshotter/core_api.py b/snapshotter/core_api.py index 10738c9..456702c 100644 --- a/snapshotter/core_api.py +++ b/snapshotter/core_api.py @@ -36,10 +36,10 @@ from snapshotter.utils.default_logger import logger from snapshotter.utils.file_utils import read_json_file from snapshotter.utils.models.data_models import SnapshotterEpochProcessingReportItem -from snapshotter.utils.models.data_models import SnapshotterStates -from snapshotter.utils.models.data_models import SnapshotterStateUpdate from snapshotter.utils.models.data_models import SnapshotterIssue from snapshotter.utils.models.data_models import SnapshotterReportState +from snapshotter.utils.models.data_models import SnapshotterStates +from snapshotter.utils.models.data_models import SnapshotterStateUpdate from snapshotter.utils.models.data_models import TaskStatusRequest from snapshotter.utils.models.data_models import UnfinalizedProject from snapshotter.utils.redis.rate_limiter import load_rate_limiter_scripts @@ -51,7 +51,6 @@ from snapshotter.utils.redis.redis_keys import project_last_finalized_epoch_key from snapshotter.utils.rpc import RpcHelper - REDIS_CONN_CONF = { 'host': settings.redis.host, 'port': settings.redis.port, @@ -158,7 +157,7 @@ async def health_check( 'status': 'error', 'message': 'Snapshotter is not active', } - + # check if there are any unfinalized projects every 10 minutes if int(time.time()) - app.state.last_unfinalized_check >= 600: app.state.last_unfinalized_check = int(time.time()) diff --git a/snapshotter/utils/data_utils.py b/snapshotter/utils/data_utils.py index 47dab8b..ebcdd88 100644 --- a/snapshotter/utils/data_utils.py +++ b/snapshotter/utils/data_utils.py @@ -1,8 +1,8 @@ import asyncio import json import os -from typing import List from random import sample +from typing import List import tenacity from ipfs_client.dag import IPFSAsyncClientError @@ -834,11 +834,12 @@ async def get_project_time_series_data( project_ids=project_ids, ) + async def snapshotter_last_finalized_check( redis_conn: aioredis.Redis, state_contract_obj, rpc_helper, -): +): try: # get all projects from redis all_projects = await redis_conn.smembers(stored_projects_key()) @@ -850,7 +851,7 @@ async def snapshotter_last_finalized_check( # get current epoch and the last finalized snapshot for each sampled project tasks = [state_contract_obj.functions.currentEpoch()] tasks += [ - state_contract_obj.functions.lastFinalizedSnapshot(project_id) + state_contract_obj.functions.lastFinalizedSnapshot(project_id) for project_id in samples ] @@ -868,9 +869,9 @@ async def snapshotter_last_finalized_check( lastFinalizedEpochId=last_finalized_snapshot, ) unfinalized_projects.append(unfinalized_project) - + return unfinalized_projects - + except Exception as e: logger.warning('Error while checking for unfinalized snapshots', error=e) - return [] \ No newline at end of file + return [] diff --git a/snapshotter/utils/models/data_models.py b/snapshotter/utils/models/data_models.py index df85b54..7ecc1de 100644 --- a/snapshotter/utils/models/data_models.py +++ b/snapshotter/utils/models/data_models.py @@ -197,6 +197,7 @@ class TaskStatusRequest(BaseModel): task_type: str wallet_address: str + class UnfinalizedProject(BaseModel): projectId: str currentEpochId: int From cc701ffee49d2ebdd8e69904a8e0a1292a54a643 Mon Sep 17 00:00:00 2001 From: Seth-Schmidt Date: Mon, 29 Apr 2024 21:51:14 -0400 Subject: [PATCH 6/7] chore: add project last sent cache to /health to limit requests --- snapshotter/core_api.py | 28 ++++++++++++++++++++++----- snapshotter/utils/redis/redis_keys.py | 8 +++++++- 2 files changed, 30 insertions(+), 6 deletions(-) diff --git a/snapshotter/core_api.py b/snapshotter/core_api.py index 5e4be2f..3fcbd63 100644 --- a/snapshotter/core_api.py +++ b/snapshotter/core_api.py @@ -48,7 +48,8 @@ from snapshotter.utils.redis.redis_keys import epoch_id_epoch_released_key from snapshotter.utils.redis.redis_keys import epoch_id_project_to_state_mapping from snapshotter.utils.redis.redis_keys import epoch_process_report_cached_key -from snapshotter.utils.redis.redis_keys import project_last_finalized_epoch_key +from snapshotter.utils.redis.redis_keys import project_last_finalized_sent_key +from snapshotter.utils.redis.redis_keys import project_last_unfinalized_sent_key from snapshotter.utils.rpc import RpcHelper REDIS_CONN_CONF = { @@ -170,6 +171,18 @@ async def health_check( if unfinalized_projects: for unfinalized_project in unfinalized_projects: + + last_known_unfinalized_epoch = await redis_conn.get( + project_last_unfinalized_sent_key(unfinalized_project.projectId), + ) + + # Check if project's last unfinalized epoch has been reported + if ( + last_known_unfinalized_epoch and + int(last_known_unfinalized_epoch) == unfinalized_project.lastFinalizedEpochId + ): + continue + notification_message = SnapshotterIssue( instanceID=settings.instance_id, issueType=SnapshotterReportState.UNFINALIZED_PROJECT.value, @@ -182,6 +195,11 @@ async def health_check( client=app.state.async_client, message=notification_message, ) + await redis_conn.set( + project_last_unfinalized_sent_key(unfinalized_project.projectId), + unfinalized_project.lastFinalizedEpochId, + ) + response.status_code = 503 return { 'status': 'error', @@ -335,7 +353,7 @@ async def get_project_last_finalized_epoch_info( # get project last finalized epoch from redis project_last_finalized_epoch = await request.app.state.redis_pool.get( - project_last_finalized_epoch_key(project_id), + project_last_finalized_sent_key(project_id), ) if project_last_finalized_epoch is None: @@ -356,7 +374,7 @@ async def get_project_last_finalized_epoch_info( epoch_finalized = True project_last_finalized_epoch = epoch_id await request.app.state.redis_pool.set( - project_last_finalized_epoch_key(project_id), + project_last_finalized_sent_key(project_id), project_last_finalized_epoch, ) else: @@ -894,7 +912,7 @@ async def get_task_status_post( # check redis first, if doesn't exist, fetch from contract last_finalized_epoch = await request.app.state.redis_pool.get( - project_last_finalized_epoch_key(project_id), + project_last_finalized_sent_key(project_id), ) if last_finalized_epoch is None: @@ -906,7 +924,7 @@ async def get_task_status_post( # cache it in redis if last_finalized_epoch != 0: await request.app.state.redis_pool.set( - project_last_finalized_epoch_key(project_id), + project_last_finalized_sent_key(project_id), last_finalized_epoch, ) else: diff --git a/snapshotter/utils/redis/redis_keys.py b/snapshotter/utils/redis/redis_keys.py index 203fa21..b42aa9b 100644 --- a/snapshotter/utils/redis/redis_keys.py +++ b/snapshotter/utils/redis/redis_keys.py @@ -68,7 +68,7 @@ def source_chain_epoch_size_key(): return 'sourceChainEpochSize' -def project_last_finalized_epoch_key(project_id): +def project_last_finalized_sent_key(project_id): return f'projectID:{project_id}:lastFinalizedEpoch' @@ -111,9 +111,11 @@ def last_snapshot_processing_complete_timestamp_key(): def last_epoch_detected_timestamp_key(): return f'lastEpochDetectedTimestamp:{settings.namespace}' + def last_epoch_detected_epoch_id_key(): return f'lastEpochDetectedEpochID:{settings.namespace}' + def submitted_base_snapshots_key(epoch_id, project_id): return f'submittedBaseSnapshots:{epoch_id}:{project_id}' @@ -124,3 +126,7 @@ def submitted_unfinalized_snapshot_cids(project_id): def process_hub_core_start_timestamp(): return f'processHubCoreStartTimestamp:{settings.namespace}' + + +def project_last_unfinalized_sent_key(project_id): + return f'projectID:{project_id}:lastFinalizedEpoch' From dd26f4144c1dd2ef4202c6bbbc35624eef729141 Mon Sep 17 00:00:00 2001 From: Seth Date: Fri, 3 May 2024 13:18:17 -0400 Subject: [PATCH 7/7] fix: rename last finalized epoch key --- snapshotter/core_api.py | 10 +++++----- snapshotter/utils/redis/redis_keys.py | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/snapshotter/core_api.py b/snapshotter/core_api.py index 3fcbd63..7074608 100644 --- a/snapshotter/core_api.py +++ b/snapshotter/core_api.py @@ -48,7 +48,7 @@ from snapshotter.utils.redis.redis_keys import epoch_id_epoch_released_key from snapshotter.utils.redis.redis_keys import epoch_id_project_to_state_mapping from snapshotter.utils.redis.redis_keys import epoch_process_report_cached_key -from snapshotter.utils.redis.redis_keys import project_last_finalized_sent_key +from snapshotter.utils.redis.redis_keys import project_last_finalized_epoch_key from snapshotter.utils.redis.redis_keys import project_last_unfinalized_sent_key from snapshotter.utils.rpc import RpcHelper @@ -353,7 +353,7 @@ async def get_project_last_finalized_epoch_info( # get project last finalized epoch from redis project_last_finalized_epoch = await request.app.state.redis_pool.get( - project_last_finalized_sent_key(project_id), + project_last_finalized_epoch_key(project_id), ) if project_last_finalized_epoch is None: @@ -374,7 +374,7 @@ async def get_project_last_finalized_epoch_info( epoch_finalized = True project_last_finalized_epoch = epoch_id await request.app.state.redis_pool.set( - project_last_finalized_sent_key(project_id), + project_last_finalized_epoch_key(project_id), project_last_finalized_epoch, ) else: @@ -912,7 +912,7 @@ async def get_task_status_post( # check redis first, if doesn't exist, fetch from contract last_finalized_epoch = await request.app.state.redis_pool.get( - project_last_finalized_sent_key(project_id), + project_last_finalized_epoch_key(project_id), ) if last_finalized_epoch is None: @@ -924,7 +924,7 @@ async def get_task_status_post( # cache it in redis if last_finalized_epoch != 0: await request.app.state.redis_pool.set( - project_last_finalized_sent_key(project_id), + project_last_finalized_epoch_key(project_id), last_finalized_epoch, ) else: diff --git a/snapshotter/utils/redis/redis_keys.py b/snapshotter/utils/redis/redis_keys.py index b42aa9b..d96a78e 100644 --- a/snapshotter/utils/redis/redis_keys.py +++ b/snapshotter/utils/redis/redis_keys.py @@ -68,7 +68,7 @@ def source_chain_epoch_size_key(): return 'sourceChainEpochSize' -def project_last_finalized_sent_key(project_id): +def project_last_finalized_epoch_key(project_id): return f'projectID:{project_id}:lastFinalizedEpoch'