Skip to content

Commit

Permalink
Merge branch 'main' into reward_from_scores
Browse files Browse the repository at this point in the history
  • Loading branch information
harisang authored Jul 12, 2024
2 parents a5d2c99 + b778238 commit 612bf72
Show file tree
Hide file tree
Showing 11 changed files with 85 additions and 257 deletions.
5 changes: 3 additions & 2 deletions .env.sample
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
VOLUME_PATH=data
APP_DATA_MAX_RETRIES=3
APP_DATA_GIVE_UP_THRESHOLD=100

# Dune credentials
DUNE_API_KEY=
Expand All @@ -19,5 +17,8 @@ AWS_BUCKET=
BARN_DB_URL={user}:{password}@{host}:{port}/{database}
PROD_DB_URL={user}:{password}@{host}:{port}/{database}

#Target table for app data sync
APP_DATA_TARGET_TABLE=app_data_mainnet

# IPFS Gateway
IPFS_ACCESS_KEY=
2 changes: 1 addition & 1 deletion requirements/prod.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
dune-client==0.3.0
dune-client==1.7.4
psycopg2-binary>=2.9.3
python-dotenv>=0.20.0
requests>=2.28.1
Expand Down
8 changes: 4 additions & 4 deletions src/dune_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from copy import copy
from dataclasses import dataclass

from dune_client.query import Query
from dune_client.query import QueryBase
from dune_client.types import QueryParameter


Expand All @@ -15,14 +15,14 @@ class QueryData:
"""Stores name and a version of the query for each query."""

name: str
query: Query
query: QueryBase

def __init__(self, name: str, query_id: int, filename: str) -> None:
self.name = name
self.filepath = filename
self.query = Query(query_id, name)
self.query = QueryBase(query_id, name)

def with_params(self, params: list[QueryParameter]) -> Query:
def with_params(self, params: list[QueryParameter]) -> QueryBase:
"""
Copies the query and adds parameters to it, returning the copy.
"""
Expand Down
9 changes: 4 additions & 5 deletions src/fetch/dune.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import sys

from dune_client.client import DuneClient
from dune_client.query import Query
from dune_client.query import QueryBase
from dune_client.types import DuneRecord
from requests import HTTPError

Expand All @@ -25,15 +25,14 @@ class DuneFetcher:

def __init__(
self,
api_key: str,
dune: DuneClient,
) -> None:
"""
Class constructor.
Builds DuneClient from `api_key` along with a logger and FileIO object.
"""
self.dune = DuneClient(api_key)
self.dune = dune

async def fetch(self, query: Query) -> list[DuneRecord]:
async def fetch(self, query: QueryBase) -> list[DuneRecord]:
"""Async Dune Fetcher with some exception handling."""
log.debug(f"Executing {query}")

Expand Down
11 changes: 11 additions & 0 deletions src/fetch/orderbook.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,3 +151,14 @@ def get_batch_rewards(cls, block_range: BlockRange) -> DataFrame:
if not barn.empty:
return barn.copy()
return pd.DataFrame()

@classmethod
def get_app_hashes(cls) -> DataFrame:
"""
Fetches all appData hashes and preimages from Prod and Staging DB
"""
app_data_query = open_query("app_hashes.sql")
barn, prod = cls._query_both_dbs(app_data_query, app_data_query)

# We are only interested in unique app data
return pd.concat([prod, barn]).drop_duplicates().reset_index(drop=True)
24 changes: 10 additions & 14 deletions src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
from pathlib import Path

from dotenv import load_dotenv
from dune_client.client import DuneClient

from src.fetch.dune import DuneFetcher
from src.fetch.orderbook import OrderbookFetcher
from src.logger import set_log
from src.models.tables import SyncTable
Expand Down Expand Up @@ -50,36 +50,32 @@ def __init__(self) -> None:
volume_path = Path(os.environ["VOLUME_PATH"])
args = ScriptArgs()
aws = AWSClient.new_from_environment()
dune = DuneClient(os.environ["DUNE_API_KEY"])
orderbook = OrderbookFetcher()

if args.sync_table == SyncTable.APP_DATA:
table = os.environ["APP_DATA_TARGET_TABLE"]
assert table, "APP_DATA sync needs a APP_DATA_TARGET_TABLE env"
asyncio.run(
sync_app_data(
aws,
dune=DuneFetcher(os.environ["DUNE_API_KEY"]),
config=AppDataSyncConfig(
volume_path=volume_path,
missing_files_name="missing_app_hashes.json",
max_retries=int(os.environ.get("APP_DATA_MAX_RETRIES", 3)),
give_up_threshold=int(
os.environ.get("APP_DATA_GIVE_UP_THRESHOLD", 100)
),
),
ipfs_access_key=os.environ["IPFS_ACCESS_KEY"],
orderbook,
dune=dune,
config=AppDataSyncConfig(table),
dry_run=args.dry_run,
)
)
elif args.sync_table == SyncTable.ORDER_REWARDS:
sync_order_rewards(
aws,
config=SyncConfig(volume_path),
fetcher=OrderbookFetcher(),
fetcher=orderbook,
dry_run=args.dry_run,
)
elif args.sync_table == SyncTable.BATCH_REWARDS:
sync_batch_rewards(
aws,
config=SyncConfig(volume_path),
fetcher=OrderbookFetcher(),
fetcher=orderbook,
dry_run=args.dry_run,
)
else:
Expand Down
5 changes: 0 additions & 5 deletions src/sql/app_hash_latest_block.sql

This file was deleted.

25 changes: 5 additions & 20 deletions src/sql/app_hashes.sql
Original file line number Diff line number Diff line change
@@ -1,21 +1,6 @@
-- App Hashes: https://dune.com/queries/1610025
-- MIN(first_block_seen) = 12153263
-- Nov 16, 2022: Query takes 4 seconds to run for on full block range
with
app_hashes as (
select
min(call_block_number) first_seen_block,
get_json_object(trade, '$.appData') as app_hash
from gnosis_protocol_v2_ethereum.GPv2Settlement_call_settle
lateral view explode(trades) as trade
group by app_hash
)
select
app_hash,
first_seen_block
from app_hashes
where first_seen_block > '{{BlockFrom}}'
and first_seen_block <= '{{BlockTo}}'
-- Selects all known appData hashes and preimages (as string) from the backend database

-- For some additional stats,
-- on this data see https://dune.com/queries/1608286
SELECT
concat('0x',encode(contract_app_data, 'hex')) contract_app_data,
encode(full_app_data, 'escape')
FROM app_data
153 changes: 13 additions & 140 deletions src/sync/app_data.py
Original file line number Diff line number Diff line change
@@ -1,154 +1,27 @@
"""Main Entry point for app_hash sync"""

from dune_client.file.interface import FileIO
from dune_client.types import DuneRecord
from dune_client.client import DuneClient

from src.fetch.dune import DuneFetcher
from src.fetch.ipfs import Cid
from src.fetch.orderbook import OrderbookFetcher
from src.logger import set_log
from src.models.app_data_content import FoundContent, NotFoundContent
from src.models.block_range import BlockRange
from src.models.tables import SyncTable
from src.post.aws import AWSClient
from src.sync.common import last_sync_block
from src.sync.config import SyncConfig, AppDataSyncConfig
from src.sync.record_handler import RecordHandler
from src.sync.upload_handler import UploadHandler
from src.sync.config import AppDataSyncConfig

log = set_log(__name__)


SYNC_TABLE = SyncTable.APP_DATA


class AppDataHandler(RecordHandler): # pylint:disable=too-many-instance-attributes
"""
This class is responsible for consuming new dune records and missing values from previous runs
it attempts to fetch content for them and filters them into "found" and "not found" as necessary
"""

def __init__( # pylint:disable=too-many-arguments
self,
file_manager: FileIO,
new_rows: list[DuneRecord],
block_range: BlockRange,
config: SyncConfig,
ipfs_access_key: str,
missing_file_name: str,
):
super().__init__(block_range, SYNC_TABLE, config)
self.file_manager = file_manager
self.ipfs_access_key = ipfs_access_key

self._found: list[FoundContent] = []
self._not_found: list[NotFoundContent] = []

self.new_rows = new_rows
self.missing_file_name = missing_file_name
try:
self.missing_values = self.file_manager.load_ndjson(missing_file_name)
except FileNotFoundError:
self.missing_values = []

def num_records(self) -> int:
assert len(self.new_rows) == 0, (
"this function call is not allowed until self.new_rows have been processed! "
"call fetch_content_and_filter first"
)
return len(self._found)

async def _handle_new_records(self, max_retries: int) -> None:
# Drain the dune_results into "found" and "not found" categories
self._found, self._not_found = await Cid.fetch_many(
self.new_rows, self.ipfs_access_key, max_retries
)

async def _handle_missing_records(
self, max_retries: int, give_up_threshold: int
) -> None:
found, not_found = await Cid.fetch_many(
self.missing_values, self.ipfs_access_key, max_retries
)
while found:
self._found.append(found.pop())
while not_found:
row = not_found.pop()
app_hash, attempts = row.app_hash, row.attempts
if attempts > give_up_threshold:
log.debug(
f"No content found after {attempts} attempts for {app_hash} assuming NULL."
)
self._found.append(
FoundContent(
app_hash=app_hash,
first_seen_block=row.first_seen_block,
content={},
)
)
else:
self._not_found.append(row)

def write_found_content(self) -> None:
assert len(self.new_rows) == 0, "Must call _handle_new_records first!"
self.file_manager.write_ndjson(
data=[x.as_dune_record() for x in self._found], name=self.content_filename
)
# When not_found is empty, we want to overwrite the file (hence skip_empty=False)
# This happens when number of attempts exceeds GIVE_UP_THRESHOLD
self.file_manager.write_ndjson(
data=[x.as_dune_record() for x in self._not_found],
name=self.missing_file_name,
skip_empty=False,
)

def write_sync_data(self) -> None:
# Only write these if upload was successful.
self.file_manager.write_csv(
data=[{self.config.sync_column: str(self.block_range.block_to)}],
name=self.config.sync_file,
)

async def fetch_content_and_filter(
self, max_retries: int, give_up_threshold: int
) -> None:
"""
Run loop fetching app_data for hashes,
separates into (found and not found), returning the pair.
"""
await self._handle_new_records(max_retries)
log.info(
f"Attempting to recover missing {len(self.missing_values)} records from previous run"
)
await self._handle_missing_records(max_retries, give_up_threshold)


async def sync_app_data(
aws: AWSClient,
dune: DuneFetcher,
orderbook: OrderbookFetcher,
dune: DuneClient,
config: AppDataSyncConfig,
ipfs_access_key: str,
dry_run: bool,
) -> None:
"""App Data Sync Logic"""
block_range = BlockRange(
block_from=last_sync_block(
aws,
table=SYNC_TABLE,
genesis_block=12153262, # First App Hash Block
),
block_to=await dune.latest_app_hash_block(),
)

data_handler = AppDataHandler(
file_manager=FileIO(config.volume_path / str(SYNC_TABLE)),
new_rows=await dune.get_app_hashes(block_range),
block_range=block_range,
config=config,
ipfs_access_key=ipfs_access_key,
missing_file_name=config.missing_files_name,
)
await data_handler.fetch_content_and_filter(
max_retries=config.max_retries, give_up_threshold=config.give_up_threshold
)
UploadHandler(aws, data_handler, table=SYNC_TABLE).write_and_upload_content(dry_run)
hashes = orderbook.get_app_hashes()
if not dry_run:
dune.upload_csv(
data=hashes.to_csv(index=False),
table_name=config.table,
description=config.description,
is_private=False,
)
log.info("app_data sync run completed successfully")
16 changes: 8 additions & 8 deletions src/sync/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ class SyncConfig:


@dataclass
class AppDataSyncConfig(SyncConfig):
"""Additional data field for app data sync."""
class AppDataSyncConfig:
"""Configuration for app data sync."""

# Maximum number of retries on a single run
max_retries: int = 3
# Total number of accumulated attempts before we assume no content
give_up_threshold: int = 100
# Persisted file where we store the missing results and number of attempts.
missing_files_name: str = "missing_app_hashes.json"
# The name of the table to upload to
table: str = "app_data_test"
# Description of the table (for creation)
description: str = (
"Table containing known CoW Protocol appData hashes and their pre-images"
)
Loading

0 comments on commit 612bf72

Please sign in to comment.