Skip to content

Commit

Permalink
chore: update main cron and add legacy prices api
Browse files Browse the repository at this point in the history
  • Loading branch information
robcxyz committed Oct 27, 2023
1 parent 9cad852 commit c78dc64
Show file tree
Hide file tree
Showing 9 changed files with 120 additions and 54 deletions.
45 changes: 36 additions & 9 deletions icon_stats/api/v1/endpoints/exchanges_legacy.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,64 @@
from fastapi import APIRouter, Depends, Response
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select
from sqlmodel import select
from pydantic import BaseModel
from datetime import datetime

from icon_stats.db import get_session
from icon_stats.models.exchanges_legacy import ExchangesLegacy
from icon_stats.models.cmc_cryptocurrency_quotes_latest import CmcListingsLatestQuote

router = APIRouter()


class ExchangesLegacyResponseData(BaseModel):
marketName: str
tradeName: str
createDate: datetime
price: float
prePrice: float
dailyRate: float | None
volume: float
marketCap: float


class ExchangesLegacyResponse(BaseModel):
result: int = 200
description: str = ""
totalData: int | None = None
data: dict
data: ExchangesLegacyResponseData


@router.get("/stats/exchanges/legacy")
@router.get("/stats/exchanges/legacy", response_model=ExchangesLegacyResponse)
async def get_exchange_prices(
response: Response,
session: AsyncSession = Depends(get_session),
) -> list[ExchangesLegacyResponse]:
) -> ExchangesLegacyResponse:
"""Return list of delegations."""
query = (
select(ExchangesLegacy)
select(CmcListingsLatestQuote)
.where(CmcListingsLatestQuote.base == "ICX") # noqa
.order_by(CmcListingsLatestQuote.last_updated.desc())
.limit(1)
)

result = await session.execute(query)
data = result.scalars().all()
data = result.scalars().first()

if not data:
raise HTTPException(status_code=status.HTTP_204_NO_CONTENT) # Raise an HTTPException for the 204 status

exchanges_legacy_response_data = ExchangesLegacyResponseData(
marketName="coinmarketcap",
tradeName="icxusd",
createDate=data.last_updated,
price=data.price,
prePrice=data.price + (data.price * data.percent_change_24h / 100),
dailyRate=None,
volume=data.volume_24h,
marketCap=data.market_cap,
)
exchanges_legacy = ExchangesLegacyResponse(
data=data,
data=exchanges_legacy_response_data,
)

return exchanges_legacy
5 changes: 4 additions & 1 deletion icon_stats/api/v1/router.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from fastapi import APIRouter

from icon_stats.api.v1.endpoints import token_stats
from icon_stats.api.v1.endpoints import exchanges_legacy


api_router = APIRouter()
api_router.include_router(token_stats.router)
# api_router.include_router(token_stats.router)
api_router.include_router(exchanges_legacy.router)
11 changes: 5 additions & 6 deletions icon_stats/clients/configs.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
# from pydantic_settings import BaseSettings, SettingsConfigDict
from pydantic import BaseSettings
from pydantic_settings import BaseSettings, SettingsConfigDict

class BaseRestClientConfig(BaseSettings):
endpoint: str
Expand All @@ -11,7 +10,7 @@ class CmcClientConfig(BaseRestClientConfig):
api_key: str = ""
endpoint: str = "https://pro-api.coinmarketcap.com"

# model_config = SettingsConfigDict(
# case_sensitive=False,
# env_prefix='cmc_',
# )
model_config = SettingsConfigDict(
case_sensitive=False,
env_prefix='cmc_',
)
5 changes: 2 additions & 3 deletions icon_stats/crons/cmc_cryptocurrency_quotes_latest.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from icon_stats.log import logger
from sqlalchemy.orm import Session

from icon_stats.utils.times import convert_str_date
from icon_stats.metrics import prom_metrics
from icon_stats.models.cmc_cryptocurrency_quotes_latest import CmcListingsLatestQuote
from icon_stats.clients.cmc import new_cmc_client
Expand All @@ -22,11 +23,9 @@ async def run_cmc_cryptocurrency_quotes_latest():
logger.info("Ending top tokens cron")
return

quote['last_updated'] = convert_str_date(quote['last_updated'])
exchanges_legacy = CmcListingsLatestQuote(base='ICX', quote='USD', **quote)

# session = get_session('stats')
# session.add(exchanges_legacy)
# session.merge()

await upsert_model(db_name='stats', model=exchanges_legacy)

Expand Down
2 changes: 1 addition & 1 deletion icon_stats/db_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def create_session_factories() -> dict[str, async_sessionmaker]:
connection_string = create_conn_str(**db_config.__dict__)
engine = create_async_engine(
connection_string,
connect_args={"options": f"-c search_path={db_config.schema_}"},
# connect_args={"options": f"-c search_path={db_config.schema_}"},
echo=True,
)
session_factory = async_sessionmaker(
Expand Down
32 changes: 17 additions & 15 deletions icon_stats/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,23 @@


def sink(message):
record = message.record
log_data = {
"timestamp": record["time"].strftime('%Y-%m-%d %H:%M:%S'),
}

if "structured" in record["extra"]:
log_data["data"] = record["extra"]["structured"]
else:
log_data["message"] = record["message"]

if config.LOG_FORMAT == "json":
return json.dumps(log_data, indent=config.STRUCTURED_SETTINGS.INDENT)
else:
return log_data["message"]

try:
record = message.record
log_data = {
"timestamp": record["time"].strftime('%Y-%m-%d %H:%M:%S'),
}

if "structured" in record["extra"]:
log_data["data"] = record["extra"]["structured"]
else:
log_data["message"] = record["message"]

if config.LOG_FORMAT == "json":
return json.dumps(log_data, indent=config.STRUCTURED_SETTINGS.INDENT)
else:
return log_data["message"]
except Exception as e:
pass

logger.remove()
logger.add(sink, level=config.LOG_LEVEL, enqueue=True)
Expand Down
42 changes: 25 additions & 17 deletions icon_stats/main_cron.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from typing import Callable, TypedDict

import asyncio
from typing import Callable, TypedDict, Coroutine, Any
from apscheduler.schedulers.background import BlockingScheduler
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from loguru import logger
Expand All @@ -9,28 +9,31 @@
from icon_stats.db import session_factory
from icon_stats.crons import (
top_tokens,
cmc_cryptocurrency_quotes_latest,
)

AsyncCallable = Callable[..., Coroutine[Any, Any, Any]]


class Cron(TypedDict):
func: Callable
func: AsyncCallable
interval: int


CRONS: list[Cron] = [
# {
# "func": top_tokens.run_top_tokens,
# "interval": 600,
# },
{
"func": top_tokens.run_top_tokens,
"interval": 600,
},
]
"func": cmc_cryptocurrency_quotes_latest.run_cmc_cryptocurrency_quotes_latest,
"interval": 60,
}


def run_cron_with_session(cron: Callable):
with session_factory() as session:
cron(session=session)
]


def main():
async def main():
logger.info("Starting metrics server.")
start_http_server(config.METRICS_PORT, config.METRICS_ADDRESS)

Expand All @@ -39,19 +42,24 @@ def main():

for i in CRONS:
# Run the jobs immediately in order
run_cron_with_session(i["func"])
await i["func"]()

# Then run them in the scheduler
sched.add_job(
func=run_cron_with_session,
func=i["func"],
trigger="interval",
args=[i["func"]],
# args=[i["func"]],
seconds=i["interval"],
id=i["func"].__name__,
)

pass
sched.start()
try:
while True:
await asyncio.sleep(60) # Sleep for a minute and check again.
except (KeyboardInterrupt, SystemExit):
pass


if __name__ == "__main__":
main()
asyncio.run(main())
6 changes: 4 additions & 2 deletions icon_stats/models/cmc_cryptocurrency_quotes_latest.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import datetime

from sqlalchemy.orm import declared_attr
from sqlalchemy.types import DateTime
from sqlalchemy import Column
from sqlmodel import Field, SQLModel


Expand All @@ -20,10 +22,10 @@ class CmcListingsLatestQuote(SQLModel, table=True):
market_cap_dominance: float
fully_diluted_market_cap: float
tvl: float | None
last_updated: datetime.datetime = Field(..., primary_key=True)
last_updated: datetime.datetime = Column(DateTime(timezone=True), primary_key=True)

__table_args__ = {'schema': 'stats'}

@declared_attr
def __tablename__(cls) -> str: # noqa: N805
return "cmc"
return "cmc_listings_latest_quote"
26 changes: 26 additions & 0 deletions icon_stats/utils/times.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from datetime import datetime, timezone

def convert_str_date(date_str: str) -> datetime:
# Check if the date string ends with 'Z' (which means it's in UTC)
if date_str.endswith("Z"):
# If it does, we remove the 'Z' and parse the datetime as UTC.
# The 'Z' is not supported by the fromisoformat method, so we have to handle it
# manually.
date_str = date_str.rstrip("Z")
last_updated = datetime.fromisoformat(date_str)
last_updated = last_updated.replace(tzinfo=timezone.utc)
else:
# If the string doesn't end with 'Z', we assume it's in local time (this may
# not be a correct assumption depending on your data)
# Here, you might want to handle other formats or timezones if necessary.
try:
last_updated = datetime.fromisoformat(date_str)
except ValueError:
# If parsing fails, we fall back to strptime with a defined format. Adjust
# the format as necessary.
last_updated = datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S')
# If your time is in a specific timezone, you can adjust it here.
# Assuming UTC for this example.
last_updated = last_updated.replace(tzinfo=timezone.utc)

return last_updated.replace(tzinfo=None)

0 comments on commit c78dc64

Please sign in to comment.