Skip to content

Commit

Permalink
Split dev and prod deployments
Browse files Browse the repository at this point in the history
  • Loading branch information
bram-vdberg committed Nov 4, 2024
1 parent ffdef79 commit b3d07e6
Show file tree
Hide file tree
Showing 11 changed files with 326 additions and 324 deletions.
24 changes: 24 additions & 0 deletions .github/workflows/pull-request.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,27 @@ jobs:
env:
PROD_DB_URL: ${{ secrets.PROD_DB_URL }}
BARN_DB_URL: ${{ secrets.BARN_DB_URL }}

prefect-pr-test-run:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.12"]

env:
PREFECT_API_URL: ${{ secrets.PREFECT_API_URL }}
BRANCH_NAME: ${{ github.head_ref }}

steps:
- uses: actions/checkout@v2
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
with:
python-version: ${{ matrix.python-version }}
- run: |
python -m pip install --upgrade pip
pip install -r requirements/prefect.txt
pip install -r requirements/prod.txt
prefect config set PREFECT_API_URL=$PREFECT_API_URL
python -m src.deploy_prefect.pr_deployment
prefect deployment run 'dev-order-rewards/dune-sync-pr-order-rewards'
155 changes: 6 additions & 149 deletions src/deploy_prefect/dev_deployment.py
Original file line number Diff line number Diff line change
@@ -1,166 +1,23 @@
"""Prefect Deployment for Order Rewards Data"""

import os
from io import StringIO
from datetime import datetime, timedelta, timezone

import requests
import pandas as pd
from dotenv import load_dotenv
from dune_client.client import DuneClient

# pylint: disable=import-error
from prefect import flow, task, get_run_logger # type: ignore

from prefect import flow # type: ignore
from prefect.runner.storage import GitRepository # type: ignore

from src.models.block_range import BlockRange
from src.fetch.orderbook import OrderbookFetcher

load_dotenv()


def get_last_monday_midnight_utc() -> int:
"""Get the timestamp of last monday at midnight UTC"""
now = datetime.now(timezone.utc)
current_weekday = now.weekday()
days_since_last_monday = current_weekday if current_weekday != 0 else 7
last_monday = now - timedelta(days=days_since_last_monday)
last_monday_midnight = last_monday.replace(
hour=0, minute=0, second=0, microsecond=0
)
timestamp = int(last_monday_midnight.timestamp())
return timestamp


@task # type: ignore[misc]
def get_block_range() -> BlockRange:
"""Returns the blockrange from last monday midnight until now"""
etherscan_api = "https://api.etherscan.io/api"
api_key = os.environ["ETHERSCAN_API_KEY"]
start = (
requests.get(
etherscan_api,
{ # type: ignore
"module": "block",
"action": "getblocknobytime",
"timestamp": get_last_monday_midnight_utc(),
"closest": "before",
"apikey": api_key,
},
timeout=60,
)
.json()
.get("result")
)
end = (
requests.get(
etherscan_api,
{ # type: ignore
"module": "block",
"action": "getblocknobytime",
"timestamp": int(datetime.now(timezone.utc).timestamp()),
"closest": "before",
"apikey": api_key,
},
timeout=60,
)
.json()
.get("result")
)

blockrange = BlockRange(block_from=start, block_to=end)
return blockrange


@task # type: ignore[misc]
def fetch_orderbook(blockrange: BlockRange) -> pd.DataFrame:
"""Runs the query to get the order book for a specified blockrange"""
orderbook = OrderbookFetcher()
return orderbook.get_order_rewards(blockrange)


@task # type: ignore[misc]
def cast_orderbook_to_dune_string(orderbook: pd.DataFrame) -> str:
"""Casts the dataframe to a string in csv format for uploading to Dune"""
csv_buffer = StringIO()
orderbook.to_csv(csv_buffer, index=False)
return csv_buffer.getvalue()


@task # type: ignore[misc]
def upload_data_to_dune(data: str, block_start: int, block_end: int) -> str:
"""
Uploads the order rewards data to Dune,
either creating a new query or updating an existing one
"""
table_name = f"order_rewards_{block_start}"
dune = DuneClient.from_env()
dune.upload_csv( # type: ignore[attr-defined]
data=data,
description=f"Order rewards data for blocks {block_start}-{block_end}",
table_name=table_name,
is_private=False,
)
return table_name


@task # type: ignore[misc]
def update_aggregate_query(table_name: str) -> None:
"""
Query example:
WITH aggregate AS (
SELECT * FROM dune.cowprotocol.order_rewards_1
UNION ALL
SELECT * FROM dune.cowprotocol.order_rewards_2
)
SELECT DISTINCT * FROM aggregate;
"""

logger = get_run_logger()
dune = DuneClient.from_env()
query_id = os.environ["AGGREGATE_QUERY_ID"]
query = dune.get_query(query_id) # type: ignore[attr-defined]
sql_query = query.sql

if table_name not in sql_query:
logger.info(f"Table name not found, updating table with {table_name}")
insertion_point = insertion_point = sql_query.rfind(")")
updated_sql_query = (
sql_query[:insertion_point].strip()
+ f"\n UNION ALL\n SELECT * FROM dune.cowprotocol.dataset_{table_name}\n"
+ sql_query[insertion_point:]
)
dune.update_query( # type: ignore[attr-defined]
query_sql=updated_sql_query, query_id=query_id
)
else:
logger.info("Table already in query, not updating query")


@flow(retries=3, retry_delay_seconds=60, log_prints=True) # type: ignore[misc]
def order_rewards() -> None:
"""Defines a flow for updating the order_rewards table"""
blockrange = get_block_range()
orderbook = fetch_orderbook(blockrange)
data = cast_orderbook_to_dune_string(orderbook)
table_name = upload_data_to_dune(data, blockrange.block_from, blockrange.block_to)
update_aggregate_query(table_name)


# pylint: disable=duplicate-code
if __name__ == "__main__":
git_source = GitRepository(
url="https://github.com/cowprotocol/dune-sync.git",
branch="dev",
)
flow.from_source(
source=git_source,
entrypoint="src/deploy_prefect/dev_deployment.py:order_rewards",
entrypoint="src/deploy_prefect/flows.py:dev_order_rewards",
).deploy(
name="dune-sync-dev-order-rewards",
work_pool_name="cowbarn",
cron="*/30 * * * *", # Every 30 minutes
tags=["solver", "dune-sync"],
tags=["dev", "solver", "dune-sync"],
description="Run the dune sync order_rewards query",
version="0.0.1",
version="0.0.2",
)
35 changes: 35 additions & 0 deletions src/deploy_prefect/flows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
"""Prefect Deployment for Order Rewards Data"""
# pylint: disable=import-error
from prefect import flow # type: ignore

from src.deploy_prefect.tasks import (
get_block_range,
fetch_orderbook,
cast_orderbook_to_dune_string,
upload_data_to_dune,
update_aggregate_query,
)
from src.deploy_prefect.models import ENV, CHAIN, Config


@flow(retries=3, retry_delay_seconds=60, log_prints=True) # type: ignore[misc]
def dev_order_rewards() -> None:
"""Defines a flow for updating the order_rewards table"""
config = Config(CHAIN.MAINNET, ENV.DEV)

blockrange = get_block_range()
orderbook = fetch_orderbook(blockrange)
data = cast_orderbook_to_dune_string(orderbook)
table_name = upload_data_to_dune(data, blockrange.block_from, blockrange.block_to)
update_aggregate_query(table_name, config)


@flow(retries=3, retry_delay_seconds=60, log_prints=True) # type: ignore[misc]
def prod_order_rewards() -> None:
"""Defines a flow for updating the order_rewards table"""
config = Config(CHAIN.MAINNET, ENV.PROD)
blockrange = get_block_range()
orderbook = fetch_orderbook(blockrange)
data = cast_orderbook_to_dune_string(orderbook)
table_name = upload_data_to_dune(data, blockrange.block_from, blockrange.block_to)
update_aggregate_query(table_name, config)
2 changes: 1 addition & 1 deletion src/deploy_prefect/local_deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
# pylint: disable=import-error
from prefect import flow # type: ignore
from dotenv import load_dotenv
from src.deploy_prefect.deployment import (
from src.deploy_prefect.tasks import (
get_block_range,
fetch_orderbook,
cast_orderbook_to_dune_string,
Expand Down
90 changes: 90 additions & 0 deletions src/deploy_prefect/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
"""Dataclasses for the prefect deployments"""
import os
from enum import Enum
from dataclasses import dataclass, field

from dotenv import load_dotenv

load_dotenv()


class ENV(Enum):
"""
Enum ENV class to change environment variables for DEV And PROD
"""

DEV = "DEV"
PR = "PR"
PROD = "PROD"

def is_dev(self) -> bool:
"""Check if the environment is DEV."""
return self == ENV.DEV

def is_pr(self) -> bool:
"""Check if the environment is PR."""
return self == ENV.PR

def is_prod(self) -> bool:
"""Check if the environment is PROD."""
return self == ENV.PROD


class CHAIN(Enum):
"""
Enum CHAIN class to change environment variables different chains
"""

ARBITRUM = "ARBITRUM"
GNOSIS = "GNOSIS"
MAINNET = "MAINNET"

def is_arbitrum(self) -> bool:
"""Check if the chain is Arbitrum"""
return self == CHAIN.ARBITRUM

def is_gnosis(self) -> bool:
"""Check if the chain is Gnosis"""
return self == CHAIN.GNOSIS

def is_mainnet(self) -> bool:
"""Check if the chain is mainnet"""
return self == CHAIN.MAINNET


@dataclass(frozen=True)
class Config:
"""
Config dataclass to setup config based on chain&env combination.
"""

_chain: CHAIN
_env: ENV

_dune_query_id: str = field(init=False)
_etherscan_api_key: str = field(init=False)

def __post_init__(self) -> None:
etherscan_api_value = os.environ["ETHERSCAN_API_KEY"]

if self._env.is_dev():
dune_query_id_value = os.environ["AGGREGATE_QUERY_DEV_ID"]
elif self._env.is_pr():
dune_query_id_value = os.environ["AGGREGATE_QUERY_PR_ID"]
elif self._env.is_prod():
dune_query_id_value = os.environ["AGGREGATE_QUERY_ID"]
else:
raise ValueError("ENV is neither DEV, PR, nor PROD")

object.__setattr__(self, "_etherscan_api_key", etherscan_api_value)
object.__setattr__(self, "_dune_query_id", dune_query_id_value)

@property
def etherscan_api_key(self) -> str:
"""Etherscan API key getter"""
return self._etherscan_api_key

@property
def dune_query_id(self) -> str:
"""Dune Aggregate Query Getter"""
return self._dune_query_id
25 changes: 25 additions & 0 deletions src/deploy_prefect/pr_deployment.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
"""Prefect Deployment for Order Rewards Data"""
import os

# pylint: disable=import-error
from prefect import flow # type: ignore
from prefect.runner.storage import GitRepository # type: ignore


# pylint: disable=duplicate-code
if __name__ == "__main__":
branch_name = os.getenv("BRANCH_NAME")
git_source = GitRepository(
url="https://github.com/cowprotocol/dune-sync.git",
branch=branch_name,
)
flow.from_source(
source=git_source,
entrypoint="src/deploy_prefect/flows.py:dev_order_rewards",
).deploy(
name="dune-sync-pr-order-rewards",
work_pool_name="cowbarn",
tags=["dev", "solver", "dune-sync"],
description="Run the dune sync order_rewards query",
version="0.0.2",
)
Loading

0 comments on commit b3d07e6

Please sign in to comment.