Skip to content

Commit

Permalink
Split prefect deployments into dev and prod
Browse files Browse the repository at this point in the history
  • Loading branch information
bram-vdberg committed Oct 30, 2024
1 parent 8d0e449 commit ffdef79
Show file tree
Hide file tree
Showing 4 changed files with 197 additions and 4 deletions.
27 changes: 27 additions & 0 deletions .github/workflows/prefect-dev.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
name: prefect-dev
on:
push:
branches: [ dev ]
jobs:
dev-deployment:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.12"]

steps:
- uses: actions/checkout@v2
- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: ${{ matrix.python-version }}

- name: Deploy prefect deployment
env:
PREFECT_API_URL: ${{ secrets.PREFECT_API_URL }}
run: |
python -m pip install --upgrade pip
pip install -r requirements/prefect.txt
pip install -r requirements/prod.txt
prefect config set PREFECT_API_URL=$PREFECT_API_URL
python -m src.deploy_prefect.dev_deployment
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
name: prefect
name: prefect-prod
on:
push:
branches: [ main ]
jobs:
deployment:
prod-deployment:
runs-on: ubuntu-latest
strategy:
matrix:
Expand All @@ -24,4 +24,4 @@ jobs:
pip install -r requirements/prefect.txt
pip install -r requirements/prod.txt
prefect config set PREFECT_API_URL=$PREFECT_API_URL
python -m src.deploy_prefect.deployment
python -m src.deploy_prefect.prod_deployment
166 changes: 166 additions & 0 deletions src/deploy_prefect/dev_deployment.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
"""Prefect Deployment for Order Rewards Data"""

import os
from io import StringIO
from datetime import datetime, timedelta, timezone

import requests
import pandas as pd
from dotenv import load_dotenv
from dune_client.client import DuneClient

# pylint: disable=import-error
from prefect import flow, task, get_run_logger # type: ignore

from prefect.runner.storage import GitRepository # type: ignore

from src.models.block_range import BlockRange
from src.fetch.orderbook import OrderbookFetcher

load_dotenv()


def get_last_monday_midnight_utc() -> int:
"""Get the timestamp of last monday at midnight UTC"""
now = datetime.now(timezone.utc)
current_weekday = now.weekday()
days_since_last_monday = current_weekday if current_weekday != 0 else 7
last_monday = now - timedelta(days=days_since_last_monday)
last_monday_midnight = last_monday.replace(
hour=0, minute=0, second=0, microsecond=0
)
timestamp = int(last_monday_midnight.timestamp())
return timestamp


@task # type: ignore[misc]
def get_block_range() -> BlockRange:
"""Returns the blockrange from last monday midnight until now"""
etherscan_api = "https://api.etherscan.io/api"
api_key = os.environ["ETHERSCAN_API_KEY"]
start = (
requests.get(
etherscan_api,
{ # type: ignore
"module": "block",
"action": "getblocknobytime",
"timestamp": get_last_monday_midnight_utc(),
"closest": "before",
"apikey": api_key,
},
timeout=60,
)
.json()
.get("result")
)
end = (
requests.get(
etherscan_api,
{ # type: ignore
"module": "block",
"action": "getblocknobytime",
"timestamp": int(datetime.now(timezone.utc).timestamp()),
"closest": "before",
"apikey": api_key,
},
timeout=60,
)
.json()
.get("result")
)

blockrange = BlockRange(block_from=start, block_to=end)
return blockrange


@task # type: ignore[misc]
def fetch_orderbook(blockrange: BlockRange) -> pd.DataFrame:
"""Runs the query to get the order book for a specified blockrange"""
orderbook = OrderbookFetcher()
return orderbook.get_order_rewards(blockrange)


@task # type: ignore[misc]
def cast_orderbook_to_dune_string(orderbook: pd.DataFrame) -> str:
"""Casts the dataframe to a string in csv format for uploading to Dune"""
csv_buffer = StringIO()
orderbook.to_csv(csv_buffer, index=False)
return csv_buffer.getvalue()


@task # type: ignore[misc]
def upload_data_to_dune(data: str, block_start: int, block_end: int) -> str:
"""
Uploads the order rewards data to Dune,
either creating a new query or updating an existing one
"""
table_name = f"order_rewards_{block_start}"
dune = DuneClient.from_env()
dune.upload_csv( # type: ignore[attr-defined]
data=data,
description=f"Order rewards data for blocks {block_start}-{block_end}",
table_name=table_name,
is_private=False,
)
return table_name


@task # type: ignore[misc]
def update_aggregate_query(table_name: str) -> None:
"""
Query example:
WITH aggregate AS (
SELECT * FROM dune.cowprotocol.order_rewards_1
UNION ALL
SELECT * FROM dune.cowprotocol.order_rewards_2
)
SELECT DISTINCT * FROM aggregate;
"""

logger = get_run_logger()
dune = DuneClient.from_env()
query_id = os.environ["AGGREGATE_QUERY_ID"]
query = dune.get_query(query_id) # type: ignore[attr-defined]
sql_query = query.sql

if table_name not in sql_query:
logger.info(f"Table name not found, updating table with {table_name}")
insertion_point = insertion_point = sql_query.rfind(")")
updated_sql_query = (
sql_query[:insertion_point].strip()
+ f"\n UNION ALL\n SELECT * FROM dune.cowprotocol.dataset_{table_name}\n"
+ sql_query[insertion_point:]
)
dune.update_query( # type: ignore[attr-defined]
query_sql=updated_sql_query, query_id=query_id
)
else:
logger.info("Table already in query, not updating query")


@flow(retries=3, retry_delay_seconds=60, log_prints=True) # type: ignore[misc]
def order_rewards() -> None:
"""Defines a flow for updating the order_rewards table"""
blockrange = get_block_range()
orderbook = fetch_orderbook(blockrange)
data = cast_orderbook_to_dune_string(orderbook)
table_name = upload_data_to_dune(data, blockrange.block_from, blockrange.block_to)
update_aggregate_query(table_name)


if __name__ == "__main__":
git_source = GitRepository(
url="https://github.com/cowprotocol/dune-sync.git",
)
flow.from_source(
source=git_source,
entrypoint="src/deploy_prefect/dev_deployment.py:order_rewards",
).deploy(
name="dune-sync-dev-order-rewards",
work_pool_name="cowbarn",
cron="*/30 * * * *", # Every 30 minutes
tags=["solver", "dune-sync"],
description="Run the dune sync order_rewards query",
version="0.0.1",
)
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ def order_rewards() -> None:
)
flow.from_source(
source=git_source,
entrypoint="src/deploy_prefect/deployment.py:order_rewards",
entrypoint="src/deploy_prefect/prod_deployment.py:order_rewards",
).deploy(
name="dune-sync-prod-order-rewards",
work_pool_name="cowbarn",
Expand Down

0 comments on commit ffdef79

Please sign in to comment.