Skip to content

Commit

Permalink
add batch and order data jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
harisang committed Dec 6, 2024
1 parent 4acb308 commit 2c086e5
Show file tree
Hide file tree
Showing 7 changed files with 479 additions and 0 deletions.
Empty file added src/dbt/__init__.py
Empty file.
50 changes: 50 additions & 0 deletions src/dbt/batch_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
"""Main Entry point for batch data sync"""

import os
from dotenv import load_dotenv
from web3 import Web3
from src.fetch.orderbook import OrderbookFetcher, OrderbookEnv
from src.logger import set_log
from src.dbt.common import compute_block_and_month_range
from src.models.block_range import BlockRange


log = set_log(__name__)


async def sync_batch_data(
node: Web3,
orderbook: OrderbookFetcher,
network: str,
recompute_previous_month: bool,
) -> None:
"""
Batch data Sync Logic. The recompute_previous_month flag, when enabled, forces a recomputation
of the previous month. If it is set to False, previous month is still recomputed when the current
date is the first day of the current month.
"""

block_range_list, months_list, is_even = compute_block_and_month_range(
node, recompute_previous_month
)
for i, _ in enumerate(block_range_list):
start_block = block_range_list[i][0]
end_block = block_range_list[i][1]
if is_even[i]:
table_name = "raw_batch_data_latest_even_month_" + network.lower()
else:
table_name = "raw_batch_data_latest_odd_month_" + network.lower()
block_range = BlockRange(block_from=start_block, block_to=end_block)
log.info(
f"About to process block range ({start_block}, {end_block}) for month {months_list[i]}"
)
batch_data = orderbook.get_batch_data(block_range)
log.info("SQL query successfully executed. About to update analytics table.")
batch_data.to_sql(
table_name,
orderbook._pg_engine(OrderbookEnv.ANALYTICS),
if_exists="replace",
)
log.info(
f"batch data sync run completed successfully for month {months_list[i]}"
)
125 changes: 125 additions & 0 deletions src/dbt/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
"""Shared methods between both sync scripts."""

from datetime import datetime, timezone
from typing import List, Tuple
from web3 import Web3
from src.logger import set_log
from src.models.tables import SyncTable

log = set_log(__name__)

def node_suffix(network: str) -> str:
"""
Converts network internal name to name used for nodes and dune tables
"""
if network == "mainnet":
return "ETHEREUM"
if network == "xdai":
return "GNOSIS"
if network == "arbitrum-one":
return "ARBITRUM"
return ""

def find_block_with_timestamp(node: Web3, time_stamp: float) -> int:
"""
This implements binary search and returns the smallest block number
whose timestamp is at least as large as the time_stamp argument passed in the function
"""
end_block_number = int(node.eth.get_block("finalized")["number"])
start_block_number = 1
close_in_seconds = 30

while True:
mid_block_number = (start_block_number + end_block_number) // 2
block = node.eth.get_block(mid_block_number)
block_time = block["timestamp"]
difference_in_seconds = int((time_stamp - block_time))

if abs(difference_in_seconds) < close_in_seconds:
break

if difference_in_seconds < 0:
end_block_number = mid_block_number - 1
else:
start_block_number = mid_block_number + 1

## we now brute-force to ensure we have found the right block
for b in range(mid_block_number - 200, mid_block_number + 200):
block = node.eth.get_block(b)
block_time_stamp = block["timestamp"]
if block_time_stamp >= time_stamp:
return int(block["number"])
# fallback in case correct block number hasn't been found
# in that case, we will include some more blocks than necessary
return mid_block_number + 200


def compute_block_and_month_range( # pylint: disable=too-many-locals
node: Web3, recompute_previous_month: bool
) -> Tuple[List[Tuple[int, int]], List[str], List[bool]]:
"""
This determines the block range and the relevant months
for which we will compute and upload data on Dune.
"""
# We first compute the relevant block range
# Here, we assume that the job runs at least once every 24h
# Because of that, if it is the first day of month, we also
# compute the previous month's table just to be on the safe side

latest_finalized_block = node.eth.get_block("finalized")

current_month_end_block = int(latest_finalized_block["number"])
current_month_end_timestamp = latest_finalized_block["timestamp"]

current_month_end_datetime = datetime.fromtimestamp(
current_month_end_timestamp, tz=timezone.utc
)
current_month_start_datetime = datetime(
current_month_end_datetime.year, current_month_end_datetime.month, 1, 00, 00
)
current_month_start_timestamp = current_month_start_datetime.replace(
tzinfo=timezone.utc
).timestamp()

current_month_start_block = find_block_with_timestamp(
node, current_month_start_timestamp
)

current_month = (
f"{current_month_end_datetime.year}_{current_month_end_datetime.month}"
)
if current_month_end_datetime.month % 2 == 0:
is_even = [True]
else:
is_even = [False]
months_list = [current_month]
block_range = [(current_month_start_block, current_month_end_block)]
if current_month_end_datetime.day == 1 or recompute_previous_month:
is_even.append(not is_even[0])
if current_month_end_datetime.month == 1:
previous_month = f"{current_month_end_datetime.year - 1}_12"
previous_month_start_datetime = datetime(
current_month_end_datetime.year - 1, 12, 1, 00, 00
)
else:
previous_month = f"""{current_month_end_datetime.year}_
{current_month_end_datetime.month - 1}
"""
previous_month_start_datetime = datetime(
current_month_end_datetime.year,
current_month_end_datetime.month - 1,
1,
00,
00,
)
months_list.append(previous_month)
previous_month_start_timestamp = previous_month_start_datetime.replace(
tzinfo=timezone.utc
).timestamp()
previous_month_start_block = find_block_with_timestamp(
node, previous_month_start_timestamp
)
previous_month_end_block = current_month_start_block
block_range.append((previous_month_start_block, previous_month_end_block))

return block_range, months_list, is_even
58 changes: 58 additions & 0 deletions src/dbt/sync_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
"""Main Entry point for app_hash sync"""

import argparse
import asyncio
import os
from dataclasses import dataclass

from dotenv import load_dotenv
from web3 import Web3

from src.fetch.orderbook import OrderbookFetcher
from src.logger import set_log
from src.models.tables import SyncTable
from src.dbt.batch_data import sync_batch_data
from src.dbt.common import node_suffix

log = set_log(__name__)


@dataclass
class ScriptArgs:
"""Runtime arguments' parser/initializer"""

sync_table: SyncTable

def __init__(self) -> None:
parser = argparse.ArgumentParser("Dune Community Sources Sync")
parser.add_argument(
"--sync-table",
type=SyncTable,
required=True,
choices=list(SyncTable),
)
arguments, _ = parser.parse_known_args()
self.sync_table: SyncTable = arguments.sync_table


def sync_data() -> None:
"""
Main function
"""
load_dotenv()
args = ScriptArgs()
orderbook = OrderbookFetcher()
network = node_suffix(os.environ.get("NETWORK", "mainnet"))
log.info(f"Network is set to: {network}")
web3 = Web3(Web3.HTTPProvider(os.environ.get("NODE_URL" + "_" + network)))

if args.sync_table == SyncTable.BATCH_DATA:
asyncio.run(
sync_batch_data(web3, orderbook, network, recompute_previous_month=False)
)
else:
log.error(f"unsupported sync_table '{args.sync_table}'")


if __name__ == "__main__":
sync_data()
Loading

0 comments on commit 2c086e5

Please sign in to comment.