diff --git a/.github/workflows/pull-request.yaml b/.github/workflows/pull-request.yaml index 5a84816f..a27bec2c 100644 --- a/.github/workflows/pull-request.yaml +++ b/.github/workflows/pull-request.yaml @@ -49,3 +49,27 @@ jobs: env: PROD_DB_URL: ${{ secrets.PROD_DB_URL }} BARN_DB_URL: ${{ secrets.BARN_DB_URL }} + + prefect-pr-test-run: + runs-on: ubuntu-latest + strategy: + matrix: + python-version: ["3.12"] + + env: + PREFECT_API_URL: ${{ secrets.PREFECT_API_URL }} + BRANCH_NAME: ${{ github.head_ref }} + + steps: + - uses: actions/checkout@v2 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v2 + with: + python-version: ${{ matrix.python-version }} + - run: | + python -m pip install --upgrade pip + pip install -r requirements/prefect.txt + pip install -r requirements/prod.txt + prefect config set PREFECT_API_URL=$PREFECT_API_URL + python -m src.deploy_prefect.pr_deployment + prefect deployment run 'dev-order-rewards/dune-sync-pr-order-rewards' diff --git a/src/deploy_prefect/dev_deployment.py b/src/deploy_prefect/dev_deployment.py index c7ba4249..76fd0319 100644 --- a/src/deploy_prefect/dev_deployment.py +++ b/src/deploy_prefect/dev_deployment.py @@ -1,166 +1,23 @@ """Prefect Deployment for Order Rewards Data""" - -import os -from io import StringIO -from datetime import datetime, timedelta, timezone - -import requests -import pandas as pd -from dotenv import load_dotenv -from dune_client.client import DuneClient - # pylint: disable=import-error -from prefect import flow, task, get_run_logger # type: ignore - +from prefect import flow # type: ignore from prefect.runner.storage import GitRepository # type: ignore -from src.models.block_range import BlockRange -from src.fetch.orderbook import OrderbookFetcher - -load_dotenv() - - -def get_last_monday_midnight_utc() -> int: - """Get the timestamp of last monday at midnight UTC""" - now = datetime.now(timezone.utc) - current_weekday = now.weekday() - days_since_last_monday = current_weekday if current_weekday != 0 else 7 - last_monday = now - timedelta(days=days_since_last_monday) - last_monday_midnight = last_monday.replace( - hour=0, minute=0, second=0, microsecond=0 - ) - timestamp = int(last_monday_midnight.timestamp()) - return timestamp - - -@task # type: ignore[misc] -def get_block_range() -> BlockRange: - """Returns the blockrange from last monday midnight until now""" - etherscan_api = "https://api.etherscan.io/api" - api_key = os.environ["ETHERSCAN_API_KEY"] - start = ( - requests.get( - etherscan_api, - { # type: ignore - "module": "block", - "action": "getblocknobytime", - "timestamp": get_last_monday_midnight_utc(), - "closest": "before", - "apikey": api_key, - }, - timeout=60, - ) - .json() - .get("result") - ) - end = ( - requests.get( - etherscan_api, - { # type: ignore - "module": "block", - "action": "getblocknobytime", - "timestamp": int(datetime.now(timezone.utc).timestamp()), - "closest": "before", - "apikey": api_key, - }, - timeout=60, - ) - .json() - .get("result") - ) - - blockrange = BlockRange(block_from=start, block_to=end) - return blockrange - - -@task # type: ignore[misc] -def fetch_orderbook(blockrange: BlockRange) -> pd.DataFrame: - """Runs the query to get the order book for a specified blockrange""" - orderbook = OrderbookFetcher() - return orderbook.get_order_rewards(blockrange) - - -@task # type: ignore[misc] -def cast_orderbook_to_dune_string(orderbook: pd.DataFrame) -> str: - """Casts the dataframe to a string in csv format for uploading to Dune""" - csv_buffer = StringIO() - orderbook.to_csv(csv_buffer, index=False) - return csv_buffer.getvalue() - - -@task # type: ignore[misc] -def upload_data_to_dune(data: str, block_start: int, block_end: int) -> str: - """ - Uploads the order rewards data to Dune, - either creating a new query or updating an existing one - """ - table_name = f"order_rewards_{block_start}" - dune = DuneClient.from_env() - dune.upload_csv( # type: ignore[attr-defined] - data=data, - description=f"Order rewards data for blocks {block_start}-{block_end}", - table_name=table_name, - is_private=False, - ) - return table_name - - -@task # type: ignore[misc] -def update_aggregate_query(table_name: str) -> None: - """ - Query example: - WITH aggregate AS ( - SELECT * FROM dune.cowprotocol.order_rewards_1 - UNION ALL - SELECT * FROM dune.cowprotocol.order_rewards_2 - ) - - SELECT DISTINCT * FROM aggregate; - """ - - logger = get_run_logger() - dune = DuneClient.from_env() - query_id = os.environ["AGGREGATE_QUERY_ID"] - query = dune.get_query(query_id) # type: ignore[attr-defined] - sql_query = query.sql - - if table_name not in sql_query: - logger.info(f"Table name not found, updating table with {table_name}") - insertion_point = insertion_point = sql_query.rfind(")") - updated_sql_query = ( - sql_query[:insertion_point].strip() - + f"\n UNION ALL\n SELECT * FROM dune.cowprotocol.dataset_{table_name}\n" - + sql_query[insertion_point:] - ) - dune.update_query( # type: ignore[attr-defined] - query_sql=updated_sql_query, query_id=query_id - ) - else: - logger.info("Table already in query, not updating query") - - -@flow(retries=3, retry_delay_seconds=60, log_prints=True) # type: ignore[misc] -def order_rewards() -> None: - """Defines a flow for updating the order_rewards table""" - blockrange = get_block_range() - orderbook = fetch_orderbook(blockrange) - data = cast_orderbook_to_dune_string(orderbook) - table_name = upload_data_to_dune(data, blockrange.block_from, blockrange.block_to) - update_aggregate_query(table_name) - +# pylint: disable=duplicate-code if __name__ == "__main__": git_source = GitRepository( url="https://github.com/cowprotocol/dune-sync.git", + branch="dev", ) flow.from_source( source=git_source, - entrypoint="src/deploy_prefect/dev_deployment.py:order_rewards", + entrypoint="src/deploy_prefect/flows.py:dev_order_rewards", ).deploy( name="dune-sync-dev-order-rewards", work_pool_name="cowbarn", cron="*/30 * * * *", # Every 30 minutes - tags=["solver", "dune-sync"], + tags=["dev", "solver", "dune-sync"], description="Run the dune sync order_rewards query", - version="0.0.1", + version="0.0.2", ) diff --git a/src/deploy_prefect/flows.py b/src/deploy_prefect/flows.py new file mode 100644 index 00000000..a317c251 --- /dev/null +++ b/src/deploy_prefect/flows.py @@ -0,0 +1,35 @@ +"""Prefect Deployment for Order Rewards Data""" +# pylint: disable=import-error +from prefect import flow # type: ignore + +from src.deploy_prefect.tasks import ( + get_block_range, + fetch_orderbook, + cast_orderbook_to_dune_string, + upload_data_to_dune, + update_aggregate_query, +) +from src.deploy_prefect.models import ENV, CHAIN, Config + + +@flow(retries=3, retry_delay_seconds=60, log_prints=True) # type: ignore[misc] +def dev_order_rewards() -> None: + """Defines a flow for updating the order_rewards table""" + config = Config(CHAIN.MAINNET, ENV.DEV) + + blockrange = get_block_range() + orderbook = fetch_orderbook(blockrange) + data = cast_orderbook_to_dune_string(orderbook) + table_name = upload_data_to_dune(data, blockrange.block_from, blockrange.block_to) + update_aggregate_query(table_name, config) + + +@flow(retries=3, retry_delay_seconds=60, log_prints=True) # type: ignore[misc] +def prod_order_rewards() -> None: + """Defines a flow for updating the order_rewards table""" + config = Config(CHAIN.MAINNET, ENV.PROD) + blockrange = get_block_range() + orderbook = fetch_orderbook(blockrange) + data = cast_orderbook_to_dune_string(orderbook) + table_name = upload_data_to_dune(data, blockrange.block_from, blockrange.block_to) + update_aggregate_query(table_name, config) diff --git a/src/deploy_prefect/local_deploy.py b/src/deploy_prefect/local_deploy.py index 92700b59..a4e0a215 100644 --- a/src/deploy_prefect/local_deploy.py +++ b/src/deploy_prefect/local_deploy.py @@ -5,7 +5,7 @@ # pylint: disable=import-error from prefect import flow # type: ignore from dotenv import load_dotenv -from src.deploy_prefect.deployment import ( +from src.deploy_prefect.tasks import ( get_block_range, fetch_orderbook, cast_orderbook_to_dune_string, diff --git a/src/deploy_prefect/models.py b/src/deploy_prefect/models.py new file mode 100644 index 00000000..1c77f463 --- /dev/null +++ b/src/deploy_prefect/models.py @@ -0,0 +1,90 @@ +"""Dataclasses for the prefect deployments""" +import os +from enum import Enum +from dataclasses import dataclass, field + +from dotenv import load_dotenv + +load_dotenv() + + +class ENV(Enum): + """ + Enum ENV class to change environment variables for DEV And PROD + """ + + DEV = "DEV" + PR = "PR" + PROD = "PROD" + + def is_dev(self) -> bool: + """Check if the environment is DEV.""" + return self == ENV.DEV + + def is_pr(self) -> bool: + """Check if the environment is PR.""" + return self == ENV.PR + + def is_prod(self) -> bool: + """Check if the environment is PROD.""" + return self == ENV.PROD + + +class CHAIN(Enum): + """ + Enum CHAIN class to change environment variables different chains + """ + + ARBITRUM = "ARBITRUM" + GNOSIS = "GNOSIS" + MAINNET = "MAINNET" + + def is_arbitrum(self) -> bool: + """Check if the chain is Arbitrum""" + return self == CHAIN.ARBITRUM + + def is_gnosis(self) -> bool: + """Check if the chain is Gnosis""" + return self == CHAIN.GNOSIS + + def is_mainnet(self) -> bool: + """Check if the chain is mainnet""" + return self == CHAIN.MAINNET + + +@dataclass(frozen=True) +class Config: + """ + Config dataclass to setup config based on chain&env combination. + """ + + _chain: CHAIN + _env: ENV + + _dune_query_id: str = field(init=False) + _etherscan_api_key: str = field(init=False) + + def __post_init__(self) -> None: + etherscan_api_value = os.environ["ETHERSCAN_API_KEY"] + + if self._env.is_dev(): + dune_query_id_value = os.environ["AGGREGATE_QUERY_DEV_ID"] + elif self._env.is_pr(): + dune_query_id_value = os.environ["AGGREGATE_QUERY_PR_ID"] + elif self._env.is_prod(): + dune_query_id_value = os.environ["AGGREGATE_QUERY_ID"] + else: + raise ValueError("ENV is neither DEV, PR, nor PROD") + + object.__setattr__(self, "_etherscan_api_key", etherscan_api_value) + object.__setattr__(self, "_dune_query_id", dune_query_id_value) + + @property + def etherscan_api_key(self) -> str: + """Etherscan API key getter""" + return self._etherscan_api_key + + @property + def dune_query_id(self) -> str: + """Dune Aggregate Query Getter""" + return self._dune_query_id diff --git a/src/deploy_prefect/pr_deployment.py b/src/deploy_prefect/pr_deployment.py new file mode 100644 index 00000000..ad8d3f52 --- /dev/null +++ b/src/deploy_prefect/pr_deployment.py @@ -0,0 +1,25 @@ +"""Prefect Deployment for Order Rewards Data""" +import os + +# pylint: disable=import-error +from prefect import flow # type: ignore +from prefect.runner.storage import GitRepository # type: ignore + + +# pylint: disable=duplicate-code +if __name__ == "__main__": + branch_name = os.getenv("BRANCH_NAME") + git_source = GitRepository( + url="https://github.com/cowprotocol/dune-sync.git", + branch=branch_name, + ) + flow.from_source( + source=git_source, + entrypoint="src/deploy_prefect/flows.py:dev_order_rewards", + ).deploy( + name="dune-sync-pr-order-rewards", + work_pool_name="cowbarn", + tags=["dev", "solver", "dune-sync"], + description="Run the dune sync order_rewards query", + version="0.0.2", + ) diff --git a/src/deploy_prefect/prod_deployment.py b/src/deploy_prefect/prod_deployment.py index 4ec3ab69..3453ab0e 100644 --- a/src/deploy_prefect/prod_deployment.py +++ b/src/deploy_prefect/prod_deployment.py @@ -1,166 +1,22 @@ """Prefect Deployment for Order Rewards Data""" - -import os -from io import StringIO -from datetime import datetime, timedelta, timezone - -import requests -import pandas as pd -from dotenv import load_dotenv -from dune_client.client import DuneClient - # pylint: disable=import-error -from prefect import flow, task, get_run_logger # type: ignore - +from prefect import flow # type: ignore from prefect.runner.storage import GitRepository # type: ignore -from src.models.block_range import BlockRange -from src.fetch.orderbook import OrderbookFetcher - -load_dotenv() - - -def get_last_monday_midnight_utc() -> int: - """Get the timestamp of last monday at midnight UTC""" - now = datetime.now(timezone.utc) - current_weekday = now.weekday() - days_since_last_monday = current_weekday if current_weekday != 0 else 7 - last_monday = now - timedelta(days=days_since_last_monday) - last_monday_midnight = last_monday.replace( - hour=0, minute=0, second=0, microsecond=0 - ) - timestamp = int(last_monday_midnight.timestamp()) - return timestamp - - -@task # type: ignore[misc] -def get_block_range() -> BlockRange: - """Returns the blockrange from last monday midnight until now""" - etherscan_api = "https://api.etherscan.io/api" - api_key = os.environ["ETHERSCAN_API_KEY"] - start = ( - requests.get( - etherscan_api, - { # type: ignore - "module": "block", - "action": "getblocknobytime", - "timestamp": get_last_monday_midnight_utc(), - "closest": "before", - "apikey": api_key, - }, - timeout=60, - ) - .json() - .get("result") - ) - end = ( - requests.get( - etherscan_api, - { # type: ignore - "module": "block", - "action": "getblocknobytime", - "timestamp": int(datetime.now(timezone.utc).timestamp()), - "closest": "before", - "apikey": api_key, - }, - timeout=60, - ) - .json() - .get("result") - ) - - blockrange = BlockRange(block_from=start, block_to=end) - return blockrange - - -@task # type: ignore[misc] -def fetch_orderbook(blockrange: BlockRange) -> pd.DataFrame: - """Runs the query to get the order book for a specified blockrange""" - orderbook = OrderbookFetcher() - return orderbook.get_order_rewards(blockrange) - - -@task # type: ignore[misc] -def cast_orderbook_to_dune_string(orderbook: pd.DataFrame) -> str: - """Casts the dataframe to a string in csv format for uploading to Dune""" - csv_buffer = StringIO() - orderbook.to_csv(csv_buffer, index=False) - return csv_buffer.getvalue() - - -@task # type: ignore[misc] -def upload_data_to_dune(data: str, block_start: int, block_end: int) -> str: - """ - Uploads the order rewards data to Dune, - either creating a new query or updating an existing one - """ - table_name = f"order_rewards_{block_start}" - dune = DuneClient.from_env() - dune.upload_csv( # type: ignore[attr-defined] - data=data, - description=f"Order rewards data for blocks {block_start}-{block_end}", - table_name=table_name, - is_private=False, - ) - return table_name - - -@task # type: ignore[misc] -def update_aggregate_query(table_name: str) -> None: - """ - Query example: - WITH aggregate AS ( - SELECT * FROM dune.cowprotocol.order_rewards_1 - UNION ALL - SELECT * FROM dune.cowprotocol.order_rewards_2 - ) - - SELECT DISTINCT * FROM aggregate; - """ - - logger = get_run_logger() - dune = DuneClient.from_env() - query_id = os.environ["AGGREGATE_QUERY_ID"] - query = dune.get_query(query_id) # type: ignore[attr-defined] - sql_query = query.sql - - if table_name not in sql_query: - logger.info(f"Table name not found, updating table with {table_name}") - insertion_point = insertion_point = sql_query.rfind(")") - updated_sql_query = ( - sql_query[:insertion_point].strip() - + f"\n UNION ALL\n SELECT * FROM dune.cowprotocol.dataset_{table_name}\n" - + sql_query[insertion_point:] - ) - dune.update_query( # type: ignore[attr-defined] - query_sql=updated_sql_query, query_id=query_id - ) - else: - logger.info("Table already in query, not updating query") - - -@flow(retries=3, retry_delay_seconds=60, log_prints=True) # type: ignore[misc] -def order_rewards() -> None: - """Defines a flow for updating the order_rewards table""" - blockrange = get_block_range() - orderbook = fetch_orderbook(blockrange) - data = cast_orderbook_to_dune_string(orderbook) - table_name = upload_data_to_dune(data, blockrange.block_from, blockrange.block_to) - update_aggregate_query(table_name) - +# pylint: disable=duplicate-code if __name__ == "__main__": git_source = GitRepository( url="https://github.com/cowprotocol/dune-sync.git", ) flow.from_source( source=git_source, - entrypoint="src/deploy_prefect/prod_deployment.py:order_rewards", + entrypoint="src/deploy_prefect/flows.py:prod_order_rewards", ).deploy( name="dune-sync-prod-order-rewards", work_pool_name="cowbarn", cron="*/30 * * * *", # Every 30 minutes - tags=["solver", "dune-sync"], + tags=["prod", "solver", "dune-sync"], description="Run the dune sync order_rewards query", - version="0.0.1", + version="0.0.2", ) diff --git a/src/deploy_prefect/tasks.py b/src/deploy_prefect/tasks.py new file mode 100644 index 00000000..e1caeedc --- /dev/null +++ b/src/deploy_prefect/tasks.py @@ -0,0 +1,125 @@ +"""Prefect Deployment for Order Rewards Data""" + +import os +from io import StringIO +from datetime import datetime, timezone + +import requests +import pandas as pd +from dotenv import load_dotenv +from dune_client.client import DuneClient + +# pylint: disable=import-error +from prefect import task, get_run_logger # type: ignore + +from src.models.block_range import BlockRange +from src.fetch.orderbook import OrderbookFetcher +from src.deploy_prefect.utils import get_last_monday_midnight_utc +from src.deploy_prefect.models import Config + +load_dotenv() + + +@task # type: ignore[misc] +def get_block_range() -> BlockRange: + """Returns the blockrange from last monday midnight until now""" + etherscan_api = "https://api.etherscan.io/api" + api_key = os.environ["ETHERSCAN_API_KEY"] + start = ( + requests.get( + etherscan_api, + { # type: ignore + "module": "block", + "action": "getblocknobytime", + "timestamp": get_last_monday_midnight_utc(), + "closest": "before", + "apikey": api_key, + }, + timeout=60, + ) + .json() + .get("result") + ) + end = ( + requests.get( + etherscan_api, + { # type: ignore + "module": "block", + "action": "getblocknobytime", + "timestamp": int(datetime.now(timezone.utc).timestamp()), + "closest": "before", + "apikey": api_key, + }, + timeout=60, + ) + .json() + .get("result") + ) + + blockrange = BlockRange(block_from=start, block_to=end) + return blockrange + + +@task # type: ignore[misc] +def fetch_orderbook(blockrange: BlockRange) -> pd.DataFrame: + """Runs the query to get the order book for a specified blockrange""" + orderbook = OrderbookFetcher() + return orderbook.get_order_rewards(blockrange) + + +@task # type: ignore[misc] +def cast_orderbook_to_dune_string(orderbook: pd.DataFrame) -> str: + """Casts the dataframe to a string in csv format for uploading to Dune""" + csv_buffer = StringIO() + orderbook.to_csv(csv_buffer, index=False) + return csv_buffer.getvalue() + + +@task # type: ignore[misc] +def upload_data_to_dune(data: str, block_start: int, block_end: int) -> str: + """ + Uploads the order rewards data to Dune, + either creating a new query or updating an existing one + """ + table_name = f"order_rewards_{block_start}" + dune = DuneClient.from_env() + dune.upload_csv( # type: ignore[attr-defined] + data=data, + description=f"Order rewards data for blocks {block_start}-{block_end}", + table_name=table_name, + is_private=False, + ) + return table_name + + +@task # type: ignore[misc] +def update_aggregate_query(table_name: str, config: Config) -> None: + """ + Query example: + WITH aggregate AS ( + SELECT * FROM dune.cowprotocol.order_rewards_1 + UNION ALL + SELECT * FROM dune.cowprotocol.order_rewards_2 + ) + + SELECT DISTINCT * FROM aggregate; + """ + + logger = get_run_logger() + dune = DuneClient.from_env() + query = dune.get_query(config.dune_query_id) # type: ignore[attr-defined] + sql_query = query.sql + + if table_name not in sql_query: + logger.info(f"Table name not found, updating table with {table_name}") + insertion_point = insertion_point = sql_query.rfind(")") + updated_sql_query = ( + sql_query[:insertion_point].strip() + + f"\n UNION ALL\n SELECT * FROM dune.cowprotocol.dataset_{table_name}\n" + + sql_query[insertion_point:] + ) + dune.update_query( # type: ignore[attr-defined] + query_sql=updated_sql_query, query_id=config.dune_query_id + ) + else: + logger.info("Table already in query, not updating query") diff --git a/src/deploy_prefect/utils.py b/src/deploy_prefect/utils.py new file mode 100644 index 00000000..1d8ab0d2 --- /dev/null +++ b/src/deploy_prefect/utils.py @@ -0,0 +1,15 @@ +"""Prefect Deployment for Order Rewards Data""" +from datetime import datetime, timedelta, timezone + + +def get_last_monday_midnight_utc() -> int: + """Get the timestamp of last monday at midnight UTC""" + now = datetime.now(timezone.utc) + current_weekday = now.weekday() + days_since_last_monday = current_weekday if current_weekday != 0 else 7 + last_monday = now - timedelta(days=days_since_last_monday) + last_monday_midnight = last_monday.replace( + hour=0, minute=0, second=0, microsecond=0 + ) + timestamp = int(last_monday_midnight.timestamp()) + return timestamp diff --git a/src/models/batch_rewards_schema.py b/src/models/batch_rewards_schema.py index 7f481c15..7001cc8d 100644 --- a/src/models/batch_rewards_schema.py +++ b/src/models/batch_rewards_schema.py @@ -35,7 +35,6 @@ def from_pdf_to_dune_records(cls, rewards_df: DataFrame) -> list[dict[str, Any]] "fee": int(row["network_fee"]), "winning_score": int(row["winning_score"]), "reference_score": int(row["reference_score"]), - "participating_solvers": row["participating_solvers"], }, } for row in rewards_df.to_dict(orient="records") diff --git a/tests/unit/test_batch_rewards_schema.py b/tests/unit/test_batch_rewards_schema.py index e7f7a171..64db1274 100644 --- a/tests/unit/test_batch_rewards_schema.py +++ b/tests/unit/test_batch_rewards_schema.py @@ -34,21 +34,6 @@ def test_order_rewards_transformation(self): "capped_payment": [-1000000000000000, -1000000000000000], "winning_score": [123456 * ONE_ETH, 6789 * ONE_ETH], "reference_score": [ONE_ETH, 2 * ONE_ETH], - "participating_solvers": [ - [ - "0x51", - "0x52", - "0x53", - ], - [ - "0x51", - "0x52", - "0x53", - "0x54", - "0x55", - "0x56", - ], - ], } ) @@ -61,7 +46,6 @@ def test_order_rewards_transformation(self): "capped_payment": -1000000000000000, "execution_cost": 9999000000000000000000, "fee": 1000000000000000, - "participating_solvers": ["0x51", "0x52", "0x53"], "protocol_fee": 2000000000000000, "reference_score": 1000000000000000000, "surplus": 2000000000000000000, @@ -78,14 +62,6 @@ def test_order_rewards_transformation(self): "capped_payment": -1000000000000000, "execution_cost": 1, "fee": max_uint, - "participating_solvers": [ - "0x51", - "0x52", - "0x53", - "0x54", - "0x55", - "0x56", - ], "protocol_fee": 0, "reference_score": 2000000000000000000, "surplus": 3000000000000000000,