Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/finalized project monitoring #77

Merged
merged 9 commits into from
May 8, 2024
76 changes: 75 additions & 1 deletion snapshotter/core_api.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
import time
from typing import List

from fastapi import Depends
Expand All @@ -9,6 +10,10 @@
from fastapi_pagination import add_pagination
from fastapi_pagination import Page
from fastapi_pagination import paginate
from httpx import AsyncClient
from httpx import AsyncHTTPTransport
from httpx import Limits
from httpx import Timeout
from ipfs_client.main import AsyncIPFSClientSingleton
from pydantic import Field
from redis import asyncio as aioredis
Expand All @@ -21,27 +26,32 @@
from snapshotter.auth.helpers.helpers import rate_limit_auth_check
from snapshotter.auth.helpers.redis_conn import RedisPoolCache as AuthRedisPoolCache
from snapshotter.settings.config import settings
from snapshotter.utils.callback_helpers import send_failure_notifications_async
from snapshotter.utils.data_utils import get_project_epoch_snapshot
from snapshotter.utils.data_utils import get_project_finalized_cid
from snapshotter.utils.data_utils import get_project_time_series_data
from snapshotter.utils.data_utils import get_snapshotter_project_status
from snapshotter.utils.data_utils import get_snapshotter_status
from snapshotter.utils.data_utils import snapshotter_last_finalized_check
from snapshotter.utils.default_logger import logger
from snapshotter.utils.file_utils import read_json_file
from snapshotter.utils.models.data_models import SnapshotterEpochProcessingReportItem
from snapshotter.utils.models.data_models import SnapshotterIssue
from snapshotter.utils.models.data_models import SnapshotterReportState
from snapshotter.utils.models.data_models import SnapshotterStates
from snapshotter.utils.models.data_models import SnapshotterStateUpdate
from snapshotter.utils.models.data_models import TaskStatusRequest
from snapshotter.utils.models.data_models import UnfinalizedProject
from snapshotter.utils.redis.rate_limiter import load_rate_limiter_scripts
from snapshotter.utils.redis.redis_conn import RedisPoolCache
from snapshotter.utils.redis.redis_keys import active_status_key
from snapshotter.utils.redis.redis_keys import epoch_id_epoch_released_key
from snapshotter.utils.redis.redis_keys import epoch_id_project_to_state_mapping
from snapshotter.utils.redis.redis_keys import epoch_process_report_cached_key
from snapshotter.utils.redis.redis_keys import project_last_finalized_epoch_key
from snapshotter.utils.redis.redis_keys import project_last_unfinalized_sent_key
from snapshotter.utils.rpc import RpcHelper


REDIS_CONN_CONF = {
'host': settings.redis.host,
'port': settings.redis.port,
Expand All @@ -52,6 +62,10 @@
# setup logging
rest_logger = logger.bind(module='Powerloom|CoreAPI')

# Disables unnecessary logging for httpx requests
rest_logger.disable('httpcore._trace')
rest_logger.disable('httpx._client')
xadahiya marked this conversation as resolved.
Show resolved Hide resolved


protocol_state_contract_abi = read_json_file(
settings.protocol_state.abi,
Expand Down Expand Up @@ -104,6 +118,19 @@ async def startup_boilerplate():
await app.state.ipfs_singleton.init_sessions()
app.state.ipfs_reader_client = app.state.ipfs_singleton._ipfs_read_client
app.state.epoch_size = 0
app.state.last_unfinalized_check = 0

app.state.async_client = AsyncClient(
timeout=Timeout(timeout=5.0),
follow_redirects=False,
transport=AsyncHTTPTransport(
limits=Limits(
max_connections=200,
max_keepalive_connections=50,
keepalive_expiry=None,
),
),
)


# Health check endpoint
Expand Down Expand Up @@ -132,6 +159,53 @@ async def health_check(
'status': 'error',
'message': 'Snapshotter is not active',
}

# check if there are any unfinalized projects every 10 minutes
if int(time.time()) - app.state.last_unfinalized_check >= 600:
app.state.last_unfinalized_check = int(time.time())
unfinalized_projects: List[UnfinalizedProject] = await snapshotter_last_finalized_check(
redis_conn,
request.app.state.protocol_state_contract,
request.app.state.anchor_rpc_helper,
)

if unfinalized_projects:
for unfinalized_project in unfinalized_projects:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Everything looks good! My only concern is that if the node is down for some reason, since core_api is independent of that, it will keep blasting a lot of failure notifications to Slack, hitting the rate limit. Let's limit the number of failure notifications we send out per minute.

@anomit, what do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes memoize this to have a check on the outflow of notifications

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added a check to see if an unfinalized project has already been reported to limit the number of notifications in the case where projects cannot finalize. The endpoint will only report a maximum of 5 projects per node every 10 minutes, and if the network goes down, they will only report as many projects that have been assigned to each node.

cc701ff


last_known_unfinalized_epoch = await redis_conn.get(
project_last_unfinalized_sent_key(unfinalized_project.projectId),
)

# Check if project's last unfinalized epoch has been reported
if (
last_known_unfinalized_epoch and
int(last_known_unfinalized_epoch) == unfinalized_project.lastFinalizedEpochId
):
continue

notification_message = SnapshotterIssue(
instanceID=settings.instance_id,
issueType=SnapshotterReportState.UNFINALIZED_PROJECT.value,
projectID=unfinalized_project.projectId,
epochId=str(unfinalized_project.currentEpochId),
timeOfReporting=str(time.time()),
extra=json.dumps({'issueDetails': 'Error : project has been unfinalized for > 50 epochs'}),
)
await send_failure_notifications_async(
client=app.state.async_client, message=notification_message,
)

await redis_conn.set(
project_last_unfinalized_sent_key(unfinalized_project.projectId),
unfinalized_project.lastFinalizedEpochId,
)

response.status_code = 503
return {
'status': 'error',
'message': 'Snapshotter has unfinalized projects',
}

return {'status': 'OK'}


Expand Down
68 changes: 68 additions & 0 deletions snapshotter/tests/last_finalized_check_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import asyncio
from web3 import Web3

from snapshotter.settings.config import settings
from snapshotter.utils.default_logger import logger as rest_logger
from snapshotter.utils.file_utils import read_json_file
from snapshotter.utils.redis.redis_conn import RedisPoolCache
from snapshotter.utils.rpc import RpcHelper

from snapshotter.settings.config import aggregator_config
from snapshotter.settings.config import projects_config
from snapshotter.utils.models.settings_model import AggregateOn
from snapshotter.utils.helper_functions import gen_multiple_type_project_id
from snapshotter.utils.helper_functions import gen_single_type_project_id
from snapshotter.utils.redis.redis_keys import stored_projects_key

from snapshotter.utils.data_utils import snapshotter_last_finalized_check


async def test_last_finalized_check():

anchor_rpc_helper = RpcHelper(rpc_settings=settings.anchor_chain_rpc)
aioredis_pool = RedisPoolCache()
await aioredis_pool.populate()
redis_conn = aioredis_pool._aioredis_pool
protocol_state_contract_abi = read_json_file(
settings.protocol_state.abi,
rest_logger,
)
protocol_state_contract_address = settings.protocol_state.address
protocol_state_contract = anchor_rpc_helper.get_current_node()['web3_client'].eth.contract(
address=Web3.toChecksumAddress(
protocol_state_contract_address,
),
abi=protocol_state_contract_abi,
)

all_projects = []

for project_config in projects_config:
all_projects += [f'{project_config.project_type}:{project.lower()}:{settings.namespace}' for project in project_config.projects]

for config in aggregator_config:
if config.aggregate_on == AggregateOn.single_project:
project_ids = filter(lambda x: config.filters.projectId in x, all_projects)
all_projects += [
gen_single_type_project_id(config.project_type, project_id)
for project_id in project_ids
]
if config.aggregate_on == AggregateOn.multi_project:
all_projects.append(gen_multiple_type_project_id(config.project_type, config.projects_to_wait_for))

await redis_conn.sadd(
stored_projects_key(),
*all_projects,
)

unfinalized_projects = await snapshotter_last_finalized_check(
redis_conn,
protocol_state_contract,
anchor_rpc_helper,
)

print(unfinalized_projects)

if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(test_last_finalized_check())
44 changes: 44 additions & 0 deletions snapshotter/utils/data_utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio
import json
import os
from random import sample
from typing import List

import tenacity
Expand All @@ -22,6 +23,7 @@
from snapshotter.utils.models.data_models import SnapshotterReportState
from snapshotter.utils.models.data_models import SnapshotterStatus
from snapshotter.utils.models.data_models import SnapshotterStatusReport
from snapshotter.utils.models.data_models import UnfinalizedProject
from snapshotter.utils.redis.redis_keys import project_finalized_data_zset
from snapshotter.utils.redis.redis_keys import project_first_epoch_hmap
from snapshotter.utils.redis.redis_keys import project_snapshotter_status_report_key
Expand Down Expand Up @@ -837,3 +839,45 @@ async def get_project_time_series_data(
ipfs_reader=ipfs_reader,
project_ids=project_ids,
)


async def snapshotter_last_finalized_check(
redis_conn: aioredis.Redis,
state_contract_obj,
rpc_helper,
):
try:
# get all projects from redis
all_projects = await redis_conn.smembers(stored_projects_key())
all_projects = [project_id.decode('utf-8') for project_id in all_projects]

# take random sample of 5 projects
samples = sample(all_projects, 5)

# get current epoch and the last finalized snapshot for each sampled project
tasks = [state_contract_obj.functions.currentEpoch()]
tasks += [
state_contract_obj.functions.lastFinalizedSnapshot(project_id)
for project_id in samples
]

[current_epoch, *last_finalized_snapshots] = await rpc_helper.web3_call(tasks, redis_conn=redis_conn)

current_epoch_id = current_epoch[2]

# check if the last finalized snapshot is older than 50 epochs
unfinalized_projects = []
for project_id, last_finalized_snapshot in zip(samples, last_finalized_snapshots):
if current_epoch_id - last_finalized_snapshot > 50:
unfinalized_project = UnfinalizedProject(
projectId=project_id,
currentEpochId=current_epoch_id,
lastFinalizedEpochId=last_finalized_snapshot,
)
unfinalized_projects.append(unfinalized_project)

return unfinalized_projects

except Exception as e:
logger.warning('Error while checking for unfinalized snapshots', error=e)
return []
7 changes: 7 additions & 0 deletions snapshotter/utils/models/data_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class SnapshotterReportState(Enum):
CRASHED_REPORTER_THREAD = 'CRASHED_REPORTER_THREAD'
UNHEALTHY_EPOCH_PROCESSING = 'UNHEALTHY_EPOCH_PROCESSING'
ONLY_FINALIZED_SNAPSHOT_RECIEVED = 'ONLY_FINALIZED_SNAPSHOT_RECIEVED'
UNFINALIZED_PROJECT = 'UNFINALIZED_PROJECT'


class SnapshotterStates(Enum):
Expand Down Expand Up @@ -198,6 +199,12 @@ class TaskStatusRequest(BaseModel):
wallet_address: str


class UnfinalizedProject(BaseModel):
projectId: str
currentEpochId: int
lastFinalizedEpochId: int


class GenericTxnIssue(BaseModel):
accountAddress: str
issueType: str
Expand Down
6 changes: 6 additions & 0 deletions snapshotter/utils/redis/redis_keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,11 @@ def last_snapshot_processing_complete_timestamp_key():
def last_epoch_detected_timestamp_key():
return f'lastEpochDetectedTimestamp:{settings.namespace}'


def last_epoch_detected_epoch_id_key():
return f'lastEpochDetectedEpochID:{settings.namespace}'


def submitted_base_snapshots_key(epoch_id, project_id):
return f'submittedBaseSnapshots:{epoch_id}:{project_id}'

Expand All @@ -124,3 +126,7 @@ def submitted_unfinalized_snapshot_cids(project_id):

def process_hub_core_start_timestamp():
return f'processHubCoreStartTimestamp:{settings.namespace}'


def project_last_unfinalized_sent_key(project_id):
return f'projectID:{project_id}:lastFinalizedEpoch'