diff --git a/icon_stats/api/v1/endpoints/exchanges_legacy.py b/icon_stats/api/v1/endpoints/exchanges_legacy.py index 92d357d..22b09b8 100644 --- a/icon_stats/api/v1/endpoints/exchanges_legacy.py +++ b/icon_stats/api/v1/endpoints/exchanges_legacy.py @@ -1,37 +1,64 @@ -from fastapi import APIRouter, Depends, Response +from fastapi import APIRouter, Depends, HTTPException, status from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.future import select from sqlmodel import select from pydantic import BaseModel +from datetime import datetime from icon_stats.db import get_session -from icon_stats.models.exchanges_legacy import ExchangesLegacy +from icon_stats.models.cmc_cryptocurrency_quotes_latest import CmcListingsLatestQuote router = APIRouter() +class ExchangesLegacyResponseData(BaseModel): + marketName: str + tradeName: str + createDate: datetime + price: float + prePrice: float + dailyRate: float | None + volume: float + marketCap: float + + class ExchangesLegacyResponse(BaseModel): result: int = 200 description: str = "" totalData: int | None = None - data: dict + data: ExchangesLegacyResponseData -@router.get("/stats/exchanges/legacy") +@router.get("/stats/exchanges/legacy", response_model=ExchangesLegacyResponse) async def get_exchange_prices( - response: Response, session: AsyncSession = Depends(get_session), -) -> list[ExchangesLegacyResponse]: +) -> ExchangesLegacyResponse: """Return list of delegations.""" query = ( - select(ExchangesLegacy) + select(CmcListingsLatestQuote) + .where(CmcListingsLatestQuote.base == "ICX") # noqa + .order_by(CmcListingsLatestQuote.last_updated.desc()) + .limit(1) ) result = await session.execute(query) - data = result.scalars().all() + data = result.scalars().first() + if not data: + raise HTTPException(status_code=status.HTTP_204_NO_CONTENT) # Raise an HTTPException for the 204 status + + exchanges_legacy_response_data = ExchangesLegacyResponseData( + marketName="coinmarketcap", + tradeName="icxusd", + createDate=data.last_updated, + price=data.price, + prePrice=data.price + (data.price * data.percent_change_24h / 100), + dailyRate=None, + volume=data.volume_24h, + marketCap=data.market_cap, + ) exchanges_legacy = ExchangesLegacyResponse( - data=data, + data=exchanges_legacy_response_data, ) return exchanges_legacy diff --git a/icon_stats/api/v1/router.py b/icon_stats/api/v1/router.py index 6370eea..93edf71 100644 --- a/icon_stats/api/v1/router.py +++ b/icon_stats/api/v1/router.py @@ -1,6 +1,9 @@ from fastapi import APIRouter from icon_stats.api.v1.endpoints import token_stats +from icon_stats.api.v1.endpoints import exchanges_legacy + api_router = APIRouter() -api_router.include_router(token_stats.router) +# api_router.include_router(token_stats.router) +api_router.include_router(exchanges_legacy.router) \ No newline at end of file diff --git a/icon_stats/clients/configs.py b/icon_stats/clients/configs.py index 869d1c8..11613c8 100644 --- a/icon_stats/clients/configs.py +++ b/icon_stats/clients/configs.py @@ -1,5 +1,4 @@ -# from pydantic_settings import BaseSettings, SettingsConfigDict -from pydantic import BaseSettings +from pydantic_settings import BaseSettings, SettingsConfigDict class BaseRestClientConfig(BaseSettings): endpoint: str @@ -11,7 +10,7 @@ class CmcClientConfig(BaseRestClientConfig): api_key: str = "" endpoint: str = "https://pro-api.coinmarketcap.com" - # model_config = SettingsConfigDict( - # case_sensitive=False, - # env_prefix='cmc_', - # ) + model_config = SettingsConfigDict( + case_sensitive=False, + env_prefix='cmc_', + ) diff --git a/icon_stats/crons/cmc_cryptocurrency_quotes_latest.py b/icon_stats/crons/cmc_cryptocurrency_quotes_latest.py index 065c62b..5f33659 100644 --- a/icon_stats/crons/cmc_cryptocurrency_quotes_latest.py +++ b/icon_stats/crons/cmc_cryptocurrency_quotes_latest.py @@ -1,6 +1,7 @@ from icon_stats.log import logger from sqlalchemy.orm import Session +from icon_stats.utils.times import convert_str_date from icon_stats.metrics import prom_metrics from icon_stats.models.cmc_cryptocurrency_quotes_latest import CmcListingsLatestQuote from icon_stats.clients.cmc import new_cmc_client @@ -22,11 +23,9 @@ async def run_cmc_cryptocurrency_quotes_latest(): logger.info("Ending top tokens cron") return + quote['last_updated'] = convert_str_date(quote['last_updated']) exchanges_legacy = CmcListingsLatestQuote(base='ICX', quote='USD', **quote) - # session = get_session('stats') - # session.add(exchanges_legacy) - # session.merge() await upsert_model(db_name='stats', model=exchanges_legacy) diff --git a/icon_stats/db_async.py b/icon_stats/db_async.py index 216be9d..4d159b7 100644 --- a/icon_stats/db_async.py +++ b/icon_stats/db_async.py @@ -63,7 +63,7 @@ def create_session_factories() -> dict[str, async_sessionmaker]: connection_string = create_conn_str(**db_config.__dict__) engine = create_async_engine( connection_string, - connect_args={"options": f"-c search_path={db_config.schema_}"}, + # connect_args={"options": f"-c search_path={db_config.schema_}"}, echo=True, ) session_factory = async_sessionmaker( diff --git a/icon_stats/log.py b/icon_stats/log.py index 115f78f..353ed63 100644 --- a/icon_stats/log.py +++ b/icon_stats/log.py @@ -5,21 +5,23 @@ def sink(message): - record = message.record - log_data = { - "timestamp": record["time"].strftime('%Y-%m-%d %H:%M:%S'), - } - - if "structured" in record["extra"]: - log_data["data"] = record["extra"]["structured"] - else: - log_data["message"] = record["message"] - - if config.LOG_FORMAT == "json": - return json.dumps(log_data, indent=config.STRUCTURED_SETTINGS.INDENT) - else: - return log_data["message"] - + try: + record = message.record + log_data = { + "timestamp": record["time"].strftime('%Y-%m-%d %H:%M:%S'), + } + + if "structured" in record["extra"]: + log_data["data"] = record["extra"]["structured"] + else: + log_data["message"] = record["message"] + + if config.LOG_FORMAT == "json": + return json.dumps(log_data, indent=config.STRUCTURED_SETTINGS.INDENT) + else: + return log_data["message"] + except Exception as e: + pass logger.remove() logger.add(sink, level=config.LOG_LEVEL, enqueue=True) diff --git a/icon_stats/main_cron.py b/icon_stats/main_cron.py index 5910077..9955924 100644 --- a/icon_stats/main_cron.py +++ b/icon_stats/main_cron.py @@ -1,5 +1,5 @@ -from typing import Callable, TypedDict - +import asyncio +from typing import Callable, TypedDict, Coroutine, Any from apscheduler.schedulers.background import BlockingScheduler from apscheduler.schedulers.asyncio import AsyncIOScheduler from loguru import logger @@ -9,28 +9,31 @@ from icon_stats.db import session_factory from icon_stats.crons import ( top_tokens, + cmc_cryptocurrency_quotes_latest, ) +AsyncCallable = Callable[..., Coroutine[Any, Any, Any]] + class Cron(TypedDict): - func: Callable + func: AsyncCallable interval: int CRONS: list[Cron] = [ + # { + # "func": top_tokens.run_top_tokens, + # "interval": 600, + # }, { - "func": top_tokens.run_top_tokens, - "interval": 600, - }, -] + "func": cmc_cryptocurrency_quotes_latest.run_cmc_cryptocurrency_quotes_latest, + "interval": 60, + } - -def run_cron_with_session(cron: Callable): - with session_factory() as session: - cron(session=session) +] -def main(): +async def main(): logger.info("Starting metrics server.") start_http_server(config.METRICS_PORT, config.METRICS_ADDRESS) @@ -39,19 +42,24 @@ def main(): for i in CRONS: # Run the jobs immediately in order - run_cron_with_session(i["func"]) + await i["func"]() # Then run them in the scheduler sched.add_job( - func=run_cron_with_session, + func=i["func"], trigger="interval", - args=[i["func"]], + # args=[i["func"]], seconds=i["interval"], id=i["func"].__name__, ) - + pass sched.start() + try: + while True: + await asyncio.sleep(60) # Sleep for a minute and check again. + except (KeyboardInterrupt, SystemExit): + pass if __name__ == "__main__": - main() + asyncio.run(main()) diff --git a/icon_stats/models/cmc_cryptocurrency_quotes_latest.py b/icon_stats/models/cmc_cryptocurrency_quotes_latest.py index 2789133..768ca9f 100644 --- a/icon_stats/models/cmc_cryptocurrency_quotes_latest.py +++ b/icon_stats/models/cmc_cryptocurrency_quotes_latest.py @@ -1,6 +1,8 @@ import datetime from sqlalchemy.orm import declared_attr +from sqlalchemy.types import DateTime +from sqlalchemy import Column from sqlmodel import Field, SQLModel @@ -20,10 +22,10 @@ class CmcListingsLatestQuote(SQLModel, table=True): market_cap_dominance: float fully_diluted_market_cap: float tvl: float | None - last_updated: datetime.datetime = Field(..., primary_key=True) + last_updated: datetime.datetime = Column(DateTime(timezone=True), primary_key=True) __table_args__ = {'schema': 'stats'} @declared_attr def __tablename__(cls) -> str: # noqa: N805 - return "cmc" + return "cmc_listings_latest_quote" diff --git a/icon_stats/utils/times.py b/icon_stats/utils/times.py new file mode 100644 index 0000000..6596762 --- /dev/null +++ b/icon_stats/utils/times.py @@ -0,0 +1,26 @@ +from datetime import datetime, timezone + +def convert_str_date(date_str: str) -> datetime: + # Check if the date string ends with 'Z' (which means it's in UTC) + if date_str.endswith("Z"): + # If it does, we remove the 'Z' and parse the datetime as UTC. + # The 'Z' is not supported by the fromisoformat method, so we have to handle it + # manually. + date_str = date_str.rstrip("Z") + last_updated = datetime.fromisoformat(date_str) + last_updated = last_updated.replace(tzinfo=timezone.utc) + else: + # If the string doesn't end with 'Z', we assume it's in local time (this may + # not be a correct assumption depending on your data) + # Here, you might want to handle other formats or timezones if necessary. + try: + last_updated = datetime.fromisoformat(date_str) + except ValueError: + # If parsing fails, we fall back to strptime with a defined format. Adjust + # the format as necessary. + last_updated = datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S') + # If your time is in a specific timezone, you can adjust it here. + # Assuming UTC for this example. + last_updated = last_updated.replace(tzinfo=timezone.utc) + + return last_updated.replace(tzinfo=None)