Skip to content

Commit

Permalink
ready for review
Browse files Browse the repository at this point in the history
  • Loading branch information
shubhagarwal03 committed Aug 9, 2024
1 parent 5772c8b commit 12bcd51
Show file tree
Hide file tree
Showing 9 changed files with 167 additions and 117 deletions.
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ pytest==7.4.0
setuptools
pandas-stubs
types-psycopg2
types-requests
types-requests
moralis
15 changes: 15 additions & 0 deletions src/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,18 @@
SDAI_TOKEN_ADDRESS = Web3.to_checksum_address(
"0x83F20F44975D03b1b09e64809B757c47f942BEeA"
)

# Time limit after which Coingecko Token List is re-fetched (in seconds)
TOKEN_LIST_RELOAD_TIME = 86400

# Time in seconds of 45 hours
COINGECKO_TIME_LIMIT = 162000

# Buffer time interval to allow 5-minutely Coingecko prices to be fetched
BUFFER_TIME = 600

# Dune query for fetching prices is set to LIMIT 1, i.e. it will return a single price
FETCH_PRICE_QUERY_ID = 3935228

# Dune Query 3935228 uses an end_timestamp to limit results
QUERY_BUFFER_TIME = 100
15 changes: 9 additions & 6 deletions src/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
check_db_connection,
logger,
)
from src.coingecko_pricing import CoingeckoPriceProvider
from src.dune_pricing import DunePriceProvider
from src.moralis_pricing import MoralisPriceProvider
from src.price_providers.coingecko_pricing import CoingeckoPriceProvider
from src.price_providers.dune_pricing import DunePriceProvider
from src.price_providers.moralis_pricing import MoralisPriceProvider


def get_start_block(
Expand Down Expand Up @@ -203,7 +203,9 @@ def write_prices(
logger.error("Error inserting record: %s", e)


def get_price(PriceProviders: List, block_number, token_address) -> Optional[float]:
def get_price(
PriceProviders: List, block_number: int, token_address: str
) -> Optional[Tuple[float, str]]:
for provider in PriceProviders:
try:
price = provider.get_price(block_number, token_address)
Expand Down Expand Up @@ -264,10 +266,11 @@ def process_transactions(chain_name: str) -> None:
token_address_bytes,
imbalance,
)
price, source = get_price(
price_data = get_price(
api_prices, block_number, token_address
)
if price is not None:
if price_data:
price, source = price_data
write_prices(
chain_name,
source,
Expand Down
42 changes: 0 additions & 42 deletions src/dune_pricing.py

This file was deleted.

33 changes: 0 additions & 33 deletions src/moralis_pricing.py

This file was deleted.

Empty file added src/price_providers/__init__.py
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -6,65 +6,64 @@
from web3 import Web3
from src.config import logger, get_web3_instance
from src.helper_functions import get_finalized_block_number
from src.constants import NATIVE_ETH_TOKEN_ADDRESS, WETH_TOKEN_ADDRESS
from src.constants import (
NATIVE_ETH_TOKEN_ADDRESS,
WETH_TOKEN_ADDRESS,
TOKEN_LIST_RELOAD_TIME,
COINGECKO_TIME_LIMIT,
BUFFER_TIME,
)

coingecko_api_key = os.getenv("COINGECKO_API_KEY")


class CoingeckoPriceProvider:
def __init__(self):
"""
Purpose of this class is to fetch historical token prices from Coingecko.
"""

def __init__(self) -> None:
self.web3 = get_web3_instance()
self.filtered_token_list = self.load_filtered_token_list()
self.filtered_token_list = self.fetch_coingecko_list()
self.last_reload_time = time.time() # current time in seconds since epoch

def load_filtered_token_list(self) -> List[Dict]:
"""
Load and filter the list of tokens from CoinGecko API.
def fetch_coingecko_list(self) -> List[Dict]:
"""
tokens_data = self.fetch_coingecko_list()
tokens_list = json.loads(tokens_data)
return self.filter_ethereum_tokens(tokens_list)

def fetch_coingecko_list(self) -> str:
"""
Fetch the list of tokens from CoinGecko with platform information.
Returns:
str: The raw JSON response from the CoinGecko API.
Fetch and filter the list of tokens (currently filters only Ethereum)
from the Coingecko API.
"""
url = (
f"https://pro-api.coingecko.com/api/v3/coins/"
f"list?include_platform=true&status=active"
)
headers = {
"accept": "application/json",
"x-cg-pro-api-key": coingecko_api_key,
}
response = requests.get(url, headers=headers)
return response.text
if coingecko_api_key:
headers["x-cg-pro-api-key"] = coingecko_api_key

def filter_ethereum_tokens(self, coingecko_tokens_list: List[Dict]) -> List[Dict]:
"""
filter tokens to include only those that are on ethereum.
"""
response = requests.get(url, headers=headers)
tokens_list = json.loads(response.text)
return [
{"id": item["id"], "platforms": {"ethereum": item["platforms"]["ethereum"]}}
for item in coingecko_tokens_list
for item in tokens_list
if "ethereum" in item["platforms"]
]

def check_reload_token_list(self) -> bool:
"""check if the token list needs to be reloaded based on time."""
current_time = time.time()
elapsed_time = current_time - self.last_reload_time
# checks every 24 hours, converted to seconds
return elapsed_time >= 86400
# checks for set elapsed time (currently 24 hours), in seconds
return elapsed_time >= TOKEN_LIST_RELOAD_TIME

def get_token_id_by_address(self, token_address):
"""Check to see if an updated token list is required.
Get the token ID using token address."""
def get_token_id_by_address(self, token_address) -> Optional[str]:
"""
Check to see if an updated token list is required.
Get the token ID by searching for the token address.
"""
if self.check_reload_token_list():
self.filtered_token_list = self.load_filtered_token_list()
self.filtered_token_list = self.fetch_coingecko_list()
self.last_reload_time = (
time.time()
) # update the last reload time to current time
Expand Down Expand Up @@ -109,14 +108,12 @@ def price_not_retrievable(self, block_start_timestamp: int) -> bool:
This function checks if the time elapsed between the latest block and block being processed
is less than 2 days, which is coingecko's time frame for fetching 5-minutely data.
"""
# Time in seconds of 45 hours.
COINGECKO_TIME_LIMIT = 162000
newest_block_timestamp = self.web3.eth.get_block(
get_finalized_block_number(self.web3)
)["timestamp"]
return (newest_block_timestamp - block_start_timestamp) > COINGECKO_TIME_LIMIT

def get_price(self, block_number: int, token_address: str):
def get_price(self, block_number: int, token_address: str) -> Optional[float]:
"""
Function returns coingecko price for a token address,
closest to and at least as large as the block timestamp for a given tx hash.
Expand All @@ -137,13 +134,15 @@ def get_price(self, block_number: int, token_address: str):
# We need to provide a sufficient buffer time for fetching 5-minutely prices from coingecko.
# If too short, it's possible that no price may be returned. We use the first value returned,
# which would be closest to block timestamp
BUFFER_TIME = 600
block_end_timestamp = block_start_timestamp + BUFFER_TIME

# Coingecko requires a lowercase token address
token_address = token_address.lower()
token_id = self.get_token_id_by_address(token_address)
if not token_id:
logger.warning(f"Token ID not found for the given address: {token_address}")
logger.warning(
f"Token ID not found for the given address on Coingecko: {token_address}"
)
return None
try:
api_price = self.fetch_api_price(
Expand Down
56 changes: 56 additions & 0 deletions src/price_providers/dune_pricing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
from typing import Optional
import dotenv, os
from dune_client.types import QueryParameter
from dune_client.client import DuneClient
from dune_client.query import QueryBase
from src.config import get_web3_instance, get_logger
from src.constants import FETCH_PRICE_QUERY_ID, QUERY_BUFFER_TIME

dotenv.load_dotenv()
dune_api_key = os.getenv("DUNE_API_KEY")
dune = DuneClient.from_env()


class DunePriceProvider:
"""
Purpose of this class is to fetch historical token prices from Dune.
"""

def __init__(self) -> None:
self.web3 = get_web3_instance()
self.logger = get_logger()

def get_price(self, block_number: int, token_address: str) -> Optional[float]:
"""
Function returns Dune price for a token address,
closest to and at least as large as the block timestamp for a given tx hash.
"""
try:
start_timestamp = self.web3.eth.get_block(block_number)["timestamp"]
end_timestamp = start_timestamp + QUERY_BUFFER_TIME
query = QueryBase(
name="ERC20 Prices",
query_id=FETCH_PRICE_QUERY_ID,
params=[
QueryParameter.text_type(name="token_address", value=token_address),
QueryParameter.number_type(
name="start_timestamp", value=start_timestamp
),
QueryParameter.number_type(
name="end_timestamp", value=end_timestamp
),
],
)
result = dune.run_query(query=query)
if result.result.rows:
row = result.result.rows[0]
price = row.get("price")
if price is not None:
return price
# No valid price found
return None
except KeyError as e:
self.logger.error(f"Key error occurred: {e}")
except Exception as e:
self.logger.error(f"Unknown error occurred: {e}")
return None
51 changes: 51 additions & 0 deletions src/price_providers/moralis_pricing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
from typing import Optional
from moralis import evm_api
from src.config import get_logger
import os, dotenv

dotenv.load_dotenv()


class MoralisPriceProvider:
"""
Purpose of this class is to fetch historical token prices using the Moralis API.
"""

def __init__(self) -> None:
self.logger = get_logger()

@staticmethod
def wei_to_eth(price: str) -> Optional[float]:
"""Function to convert string price to float price in ETH."""
float_price = float(price) if isinstance(price, str) else None
if isinstance(float_price, float):
return float_price / 10**18
return None

def get_price(self, block_number: int, token_address: str) -> Optional[float]:
"""
Function returns Moralis price given a block number and token_address.
Price returned is closest to and at least as large as block timestamp.
"""
try:
params = {
"chain": "eth",
"address": token_address,
"to_block": block_number,
}
result = evm_api.token.get_token_price(
api_key=os.getenv("MORALIS_KEY"),
params=params,
)
if "nativePrice" in result and "value" in result["nativePrice"]:
# return price in ETH
return self.wei_to_eth(result["nativePrice"]["value"])
else:
raise KeyError(" 'nativePrice' or 'value' not found in the result.")
except KeyError as e:
self.logger.warning(f"Error: {e}")
except Exception as e:
self.logger.warning(
f"Error: Likely the token: {token_address} was not found."
)
return None

0 comments on commit 12bcd51

Please sign in to comment.