Skip to content

Commit

Permalink
chore: check the last updated timestamp before running a cron
Browse files Browse the repository at this point in the history
  • Loading branch information
robcxyz committed Feb 28, 2024
1 parent 60ce15f commit 9058bb1
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 1 deletion.
1 change: 1 addition & 0 deletions icon_stats/crons/ecosystem_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ async def run_ecosystem_stats():
]:
await set_attr_func(model, column, func, func_p)

model.last_updated_timestamp = datetime.now(timezone.utc).timestamp()
await model.upsert()

prom_metrics.cron_ran.inc()
Expand Down
35 changes: 34 additions & 1 deletion icon_stats/main_cron.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import asyncio
from datetime import datetime, timezone
from typing import Any, Callable, Coroutine, TypedDict

from apscheduler.schedulers.asyncio import AsyncIOScheduler
from loguru import logger
from prometheus_client import start_http_server
from sqlalchemy import text

from icon_stats.config import config
from icon_stats.crons import (
Expand All @@ -14,13 +16,20 @@
ecosystem_stats,
token_stats,
)
from icon_stats.db import get_session
from icon_stats.db_base import BaseSQLModel
from icon_stats.models.applications import Application
from icon_stats.models.contracts import Contract
from icon_stats.models.ecosystem import Ecosystem
from icon_stats.models.tokens import Token

AsyncCallable = Callable[..., Coroutine[Any, Any, Any]]


class Cron(TypedDict):
func: AsyncCallable
interval: int
table: str | None


CRONS: list[Cron] = [
Expand All @@ -31,26 +40,44 @@ class Cron(TypedDict):
{
"func": applications_refresh.run_applications_refresh,
"interval": 86400,
"table": None,
},
{
"func": contract_stats.run_contract_stats,
"interval": 3600 * 4,
"table": "contracts",
},
{
"func": token_stats.run_token_stats,
"interval": 3600 * 4,
"table": "tokens",
},
{
"func": application_stats.run_application_stats,
"interval": 3600 * 4,
"table": "applications",
},
{
"func": ecosystem_stats.run_ecosystem_stats,
"interval": 3600 * 6,
"table": "ecosystem",
},
]


async def get_last_updated_timestamp(model: str):
async with get_session("stats") as session:
query = text(
f"""
SELECT last_updated_timestamp
FROM stats.{model}
ORDER BY last_updated_timestamp DESC LIMIT 1;
"""
)
result = await session.execute(query)
return result.first()


async def main():
logger.info("Starting metrics server.")
start_http_server(config.METRICS_PORT, config.METRICS_ADDRESS)
Expand All @@ -59,7 +86,13 @@ async def main():

for i in CRONS:
# Run the jobs immediately in order
await i["func"]()
if i["table"] is not None:
last_updated_timestamp = await get_last_updated_timestamp(i["table"])
current_timestamp = datetime.now(timezone.utc).timestamp()
if last_updated_timestamp is None:
await i["func"]()
elif current_timestamp - last_updated_timestamp > i["interval"]:
await i["func"]()

# Then run them in the scheduler
sched.add_job(
Expand Down

0 comments on commit 9058bb1

Please sign in to comment.