From 2c086e587e75d59bf35a55d7248197e0c2200e7a Mon Sep 17 00:00:00 2001 From: harisang Date: Sat, 7 Dec 2024 01:14:44 +0200 Subject: [PATCH] add batch and order data jobs --- src/dbt/__init__.py | 0 src/dbt/batch_data.py | 50 ++++++++++ src/dbt/common.py | 125 ++++++++++++++++++++++++ src/dbt/sync_data.py | 58 +++++++++++ src/fetch/orderbook.py | 197 ++++++++++++++++++++++++++++++++++++++ src/models/block_range.py | 31 ++++++ src/models/tables.py | 18 ++++ 7 files changed, 479 insertions(+) create mode 100644 src/dbt/__init__.py create mode 100644 src/dbt/batch_data.py create mode 100644 src/dbt/common.py create mode 100644 src/dbt/sync_data.py create mode 100644 src/fetch/orderbook.py create mode 100644 src/models/block_range.py create mode 100644 src/models/tables.py diff --git a/src/dbt/__init__.py b/src/dbt/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/dbt/batch_data.py b/src/dbt/batch_data.py new file mode 100644 index 00000000..275feaca --- /dev/null +++ b/src/dbt/batch_data.py @@ -0,0 +1,50 @@ +"""Main Entry point for batch data sync""" + +import os +from dotenv import load_dotenv +from web3 import Web3 +from src.fetch.orderbook import OrderbookFetcher, OrderbookEnv +from src.logger import set_log +from src.dbt.common import compute_block_and_month_range +from src.models.block_range import BlockRange + + +log = set_log(__name__) + + +async def sync_batch_data( + node: Web3, + orderbook: OrderbookFetcher, + network: str, + recompute_previous_month: bool, +) -> None: + """ + Batch data Sync Logic. The recompute_previous_month flag, when enabled, forces a recomputation + of the previous month. If it is set to False, previous month is still recomputed when the current + date is the first day of the current month. + """ + + block_range_list, months_list, is_even = compute_block_and_month_range( + node, recompute_previous_month + ) + for i, _ in enumerate(block_range_list): + start_block = block_range_list[i][0] + end_block = block_range_list[i][1] + if is_even[i]: + table_name = "raw_batch_data_latest_even_month_" + network.lower() + else: + table_name = "raw_batch_data_latest_odd_month_" + network.lower() + block_range = BlockRange(block_from=start_block, block_to=end_block) + log.info( + f"About to process block range ({start_block}, {end_block}) for month {months_list[i]}" + ) + batch_data = orderbook.get_batch_data(block_range) + log.info("SQL query successfully executed. About to update analytics table.") + batch_data.to_sql( + table_name, + orderbook._pg_engine(OrderbookEnv.ANALYTICS), + if_exists="replace", + ) + log.info( + f"batch data sync run completed successfully for month {months_list[i]}" + ) diff --git a/src/dbt/common.py b/src/dbt/common.py new file mode 100644 index 00000000..c40465cf --- /dev/null +++ b/src/dbt/common.py @@ -0,0 +1,125 @@ +"""Shared methods between both sync scripts.""" + +from datetime import datetime, timezone +from typing import List, Tuple +from web3 import Web3 +from src.logger import set_log +from src.models.tables import SyncTable + +log = set_log(__name__) + +def node_suffix(network: str) -> str: + """ + Converts network internal name to name used for nodes and dune tables + """ + if network == "mainnet": + return "ETHEREUM" + if network == "xdai": + return "GNOSIS" + if network == "arbitrum-one": + return "ARBITRUM" + return "" + +def find_block_with_timestamp(node: Web3, time_stamp: float) -> int: + """ + This implements binary search and returns the smallest block number + whose timestamp is at least as large as the time_stamp argument passed in the function + """ + end_block_number = int(node.eth.get_block("finalized")["number"]) + start_block_number = 1 + close_in_seconds = 30 + + while True: + mid_block_number = (start_block_number + end_block_number) // 2 + block = node.eth.get_block(mid_block_number) + block_time = block["timestamp"] + difference_in_seconds = int((time_stamp - block_time)) + + if abs(difference_in_seconds) < close_in_seconds: + break + + if difference_in_seconds < 0: + end_block_number = mid_block_number - 1 + else: + start_block_number = mid_block_number + 1 + + ## we now brute-force to ensure we have found the right block + for b in range(mid_block_number - 200, mid_block_number + 200): + block = node.eth.get_block(b) + block_time_stamp = block["timestamp"] + if block_time_stamp >= time_stamp: + return int(block["number"]) + # fallback in case correct block number hasn't been found + # in that case, we will include some more blocks than necessary + return mid_block_number + 200 + + +def compute_block_and_month_range( # pylint: disable=too-many-locals + node: Web3, recompute_previous_month: bool +) -> Tuple[List[Tuple[int, int]], List[str], List[bool]]: + """ + This determines the block range and the relevant months + for which we will compute and upload data on Dune. + """ + # We first compute the relevant block range + # Here, we assume that the job runs at least once every 24h + # Because of that, if it is the first day of month, we also + # compute the previous month's table just to be on the safe side + + latest_finalized_block = node.eth.get_block("finalized") + + current_month_end_block = int(latest_finalized_block["number"]) + current_month_end_timestamp = latest_finalized_block["timestamp"] + + current_month_end_datetime = datetime.fromtimestamp( + current_month_end_timestamp, tz=timezone.utc + ) + current_month_start_datetime = datetime( + current_month_end_datetime.year, current_month_end_datetime.month, 1, 00, 00 + ) + current_month_start_timestamp = current_month_start_datetime.replace( + tzinfo=timezone.utc + ).timestamp() + + current_month_start_block = find_block_with_timestamp( + node, current_month_start_timestamp + ) + + current_month = ( + f"{current_month_end_datetime.year}_{current_month_end_datetime.month}" + ) + if current_month_end_datetime.month % 2 == 0: + is_even = [True] + else: + is_even = [False] + months_list = [current_month] + block_range = [(current_month_start_block, current_month_end_block)] + if current_month_end_datetime.day == 1 or recompute_previous_month: + is_even.append(not is_even[0]) + if current_month_end_datetime.month == 1: + previous_month = f"{current_month_end_datetime.year - 1}_12" + previous_month_start_datetime = datetime( + current_month_end_datetime.year - 1, 12, 1, 00, 00 + ) + else: + previous_month = f"""{current_month_end_datetime.year}_ + {current_month_end_datetime.month - 1} + """ + previous_month_start_datetime = datetime( + current_month_end_datetime.year, + current_month_end_datetime.month - 1, + 1, + 00, + 00, + ) + months_list.append(previous_month) + previous_month_start_timestamp = previous_month_start_datetime.replace( + tzinfo=timezone.utc + ).timestamp() + previous_month_start_block = find_block_with_timestamp( + node, previous_month_start_timestamp + ) + previous_month_end_block = current_month_start_block + block_range.append((previous_month_start_block, previous_month_end_block)) + + return block_range, months_list, is_even diff --git a/src/dbt/sync_data.py b/src/dbt/sync_data.py new file mode 100644 index 00000000..d38dad61 --- /dev/null +++ b/src/dbt/sync_data.py @@ -0,0 +1,58 @@ +"""Main Entry point for app_hash sync""" + +import argparse +import asyncio +import os +from dataclasses import dataclass + +from dotenv import load_dotenv +from web3 import Web3 + +from src.fetch.orderbook import OrderbookFetcher +from src.logger import set_log +from src.models.tables import SyncTable +from src.dbt.batch_data import sync_batch_data +from src.dbt.common import node_suffix + +log = set_log(__name__) + + +@dataclass +class ScriptArgs: + """Runtime arguments' parser/initializer""" + + sync_table: SyncTable + + def __init__(self) -> None: + parser = argparse.ArgumentParser("Dune Community Sources Sync") + parser.add_argument( + "--sync-table", + type=SyncTable, + required=True, + choices=list(SyncTable), + ) + arguments, _ = parser.parse_known_args() + self.sync_table: SyncTable = arguments.sync_table + + +def sync_data() -> None: + """ + Main function + """ + load_dotenv() + args = ScriptArgs() + orderbook = OrderbookFetcher() + network = node_suffix(os.environ.get("NETWORK", "mainnet")) + log.info(f"Network is set to: {network}") + web3 = Web3(Web3.HTTPProvider(os.environ.get("NODE_URL" + "_" + network))) + + if args.sync_table == SyncTable.BATCH_DATA: + asyncio.run( + sync_batch_data(web3, orderbook, network, recompute_previous_month=False) + ) + else: + log.error(f"unsupported sync_table '{args.sync_table}'") + + +if __name__ == "__main__": + sync_data() diff --git a/src/fetch/orderbook.py b/src/fetch/orderbook.py new file mode 100644 index 00000000..3fad6ada --- /dev/null +++ b/src/fetch/orderbook.py @@ -0,0 +1,197 @@ +"""Basic client for connecting to postgres database with login credentials""" +from __future__ import annotations + +import os +from dataclasses import dataclass +from enum import Enum +from typing import Optional + +import pandas as pd +from dotenv import load_dotenv +from pandas import DataFrame +from sqlalchemy import create_engine +from sqlalchemy.engine import Engine + +from src.logger import set_log +from src.models.block_range import BlockRange +from src.utils.query_file import open_query +from src.dbt.common import node_suffix + +log = set_log(__name__) + +MAX_PROCESSING_DELAY = 10 +BUCKET_SIZE = {"mainnet": 10000, "xdai": 30000, "arbitrum-one": 1000000} + + +class OrderbookEnv(Enum): + """ + Enum for distinguishing between CoW Protocol's staging and production environment + """ + + BARN = "BARN" + PROD = "PROD" + ANALYTICS = "ANALYTICS" + + def __str__(self) -> str: + return str(self.value) + + +@dataclass +class OrderbookFetcher: + """ + A pair of Dataframes primarily intended to store query results + from production and staging orderbook databases + """ + + @staticmethod + def _pg_engine(db_env: OrderbookEnv) -> Engine: + """Returns a connection to postgres database""" + load_dotenv() + if db_env == OrderbookEnv.ANALYTICS: + db_url = os.environ["ANALYTICS_DB_URL"] + db_string = f"postgresql+psycopg2://{db_url}" + return create_engine(db_string) + + if "NETWORK" in os.environ: + db_url = ( + os.environ[f"{db_env}_DB_URL"] + + "/" + + os.environ.get("NETWORK", "mainnet") + ) + else: + db_url = os.environ[f"{db_env}_DB_URL"] + db_string = f"postgresql+psycopg2://{db_url}" + return create_engine(db_string) + + @classmethod + def _read_query_for_env( + cls, query: str, env: OrderbookEnv, data_types: Optional[dict[str, str]] = None + ) -> DataFrame: + return pd.read_sql_query(query, con=cls._pg_engine(env), dtype=data_types) + + @classmethod + def _query_both_dbs( + cls, + query_prod: str, + query_barn: str, + data_types: Optional[dict[str, str]] = None, + ) -> tuple[DataFrame, DataFrame]: + barn = cls._read_query_for_env(query_barn, OrderbookEnv.BARN, data_types) + prod = cls._read_query_for_env(query_prod, OrderbookEnv.PROD, data_types) + return barn, prod + + @classmethod + def get_order_rewards(cls, block_range: BlockRange) -> DataFrame: + """ + Fetches and validates Order Reward DataFrame as concatenation from Prod and Staging DB + """ + cow_reward_query_prod = ( + open_query("orderbook/prod_order_rewards.sql") + .replace("{{start_block}}", str(block_range.block_from)) + .replace("{{end_block}}", str(block_range.block_to)) + ) + cow_reward_query_barn = ( + open_query("orderbook/barn_order_rewards.sql") + .replace("{{start_block}}", str(block_range.block_from)) + .replace("{{end_block}}", str(block_range.block_to)) + ) + data_types = {"block_number": "int64", "amount": "float64"} + barn, prod = cls._query_both_dbs( + cow_reward_query_prod, cow_reward_query_barn, data_types + ) + + # Warn if solver appear in both environments. + if not set(prod.solver).isdisjoint(set(barn.solver)): + log.warning( + f"solver overlap in {block_range}: solvers " + f"{set(prod.solver).intersection(set(barn.solver))} part of both prod and barn" + ) + + if not prod.empty and not barn.empty: + return pd.concat([prod, barn]) + if not prod.empty: + return prod.copy() + if not barn.empty: + return barn.copy() + return pd.DataFrame() + + @classmethod + def run_batch_data_query(cls, block_range: BlockRange) -> DataFrame: + """ + Fetches and validates Batch data DataFrame as concatenation from Prod and Staging DB + """ + load_dotenv() + network = node_suffix(os.environ["NETWORK"]) + epsilon_upper = str(os.environ[f"EPSILON_UPPER_{network}"]) + epsilon_lower = str(os.environ[f"EPSILON_LOWER_{network}"]) + batch_data_query_prod = ( + open_query("orderbook/batch_data.sql") + .replace("{{start_block}}", str(block_range.block_from)) + .replace("{{end_block}}", str(block_range.block_to)) + .replace( + "{{EPSILON_LOWER}}", epsilon_lower + ) # lower ETH cap for payment (in WEI) + .replace( + "{{EPSILON_UPPER}}", epsilon_upper + ) # upper ETH cap for payment (in WEI) + .replace("{{env}}", "prod") + ) + batch_data_query_barn = ( + open_query("orderbook/batch_data.sql") + .replace("{{start_block}}", str(block_range.block_from)) + .replace("{{end_block}}", str(block_range.block_to)) + .replace( + "{{EPSILON_LOWER}}", epsilon_lower + ) # lower ETH cap for payment (in WEI) + .replace( + "{{EPSILON_UPPER}}", epsilon_upper + ) # upper ETH cap for payment (in WEI) + .replace("{{env}}", "barn") + ) + data_types = { + # According to this: https://stackoverflow.com/a/11548224 + # capitalized int64 means `Optional` and it appears to work. + "block_number": "Int64", + "block_deadline": "int64", + } + barn, prod = cls._query_both_dbs( + batch_data_query_prod, batch_data_query_barn, data_types + ) + + # Warn if solver appear in both environments. + if not set(prod.solver).isdisjoint(set(barn.solver)): + log.warning( + f"solver overlap in {block_range}: solvers " + f"{set(prod.solver).intersection(set(barn.solver))} part of both prod and barn" + ) + + if not prod.empty and not barn.empty: + return pd.concat([prod, barn]) + if not prod.empty: + return prod.copy() + if not barn.empty: + return barn.copy() + return pd.DataFrame() + + @classmethod + def get_batch_data(cls, block_range: BlockRange) -> DataFrame: + """ + Decomposes the block range into buckets of 10k blocks each, + so as to ensure the batch data query runs fast enough. + At the end, it concatenates everything into one data frame + """ + load_dotenv() + start = block_range.block_from + end = block_range.block_to + bucket_size = BUCKET_SIZE[os.environ.get("NETWORK", "mainnet")] + res = [] + while start < end: + size = min(end - start, bucket_size) + log.info(f"About to process block range ({start}, {start + size})") + res.append( + cls.run_batch_data_query( + BlockRange(block_from=start, block_to=start + size) + ) + ) + start = start + size + return pd.concat(res) diff --git a/src/models/block_range.py b/src/models/block_range.py new file mode 100644 index 00000000..25f7a89b --- /dev/null +++ b/src/models/block_range.py @@ -0,0 +1,31 @@ +""" +BlockRange Model is just a data class for left and right bounds +""" +from dataclasses import dataclass + +from dune_client.types import QueryParameter + + +@dataclass +class BlockRange: + """ + Basic dataclass for an Ethereum block range with some Dune compatibility methods. + TODO (easy) - this data class could probably live in dune-client. + https://github.com/cowprotocol/dune-bridge/issues/40 + """ + + block_from: int + block_to: int + + def __str__(self) -> str: + return f"BlockRange(from={self.block_from}, to={self.block_to})" + + def __repr__(self) -> str: + return str(self) + + def as_query_params(self) -> list[QueryParameter]: + """Returns self as Dune QueryParameters""" + return [ + QueryParameter.number_type("BlockFrom", self.block_from), + QueryParameter.number_type("BlockTo", self.block_to), + ] diff --git a/src/models/tables.py b/src/models/tables.py new file mode 100644 index 00000000..d7721452 --- /dev/null +++ b/src/models/tables.py @@ -0,0 +1,18 @@ +"""Data structure containing the supported sync tables""" +from enum import Enum + + +class SyncTable(Enum): + """Enum for Deployment Supported Table Sync""" + + ORDER_REWARDS = "order_rewards" + BATCH_REWARDS = "batch_rewards" + BATCH_DATA = "batch_data" + + def __str__(self) -> str: + return str(self.value) + + @staticmethod + def supported_tables() -> list[str]: + """Returns a list of supported tables (i.e. valid object contructors).""" + return [str(t) for t in list(SyncTable)]