Skip to content

Commit

Permalink
Add alpha API
Browse files Browse the repository at this point in the history
  • Loading branch information
ppsimatikas committed Oct 18, 2024
1 parent d1d276d commit 3b28247
Show file tree
Hide file tree
Showing 5 changed files with 196 additions and 18 deletions.
2 changes: 2 additions & 0 deletions src/chainbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from typing import Optional

from src.sql import ChainbaseSQL
from src.sql_alpha import ChainbaseSQLAlpha

MISSING_API_KEY_ERROR = """
Expand Down Expand Up @@ -50,3 +51,4 @@ def __init__(self, api_key: Optional[str] = None):
raise ValueError(MISSING_API_KEY_ERROR)

self.sql = ChainbaseSQL(api_key)
self.sql_alpha = ChainbaseSQLAlpha(api_key)
58 changes: 42 additions & 16 deletions src/chainbase_api.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import Dict
from typing import Dict, Optional

import requests
from requests import Response

DEFAULT_ERROR = "An unexpected error occurred while calling the Chainbase API."

Expand All @@ -17,27 +18,48 @@ class ChainbaseAPI:
headers (dict): HTTP headers to include with requests, including the API key.
"""

def __init__(self, url: str, api_key: str):
def __init__(self, url: str, api_key: str, success_code: int = 0):
"""
Initializes the ChainbaseAPI client with the specified API URL and key.
Args:
url (str): The base URL for the Chainbase API.
api_key (str): The API key for authenticating with the Chainbase API.
success_code (int): The response success code, defaults to 0.
"""
self.url = url
self.success_code = success_code

self.headers = {
"x-api-key": api_key,
"Content-Type": "application/json; charset=utf-8",
}

def post(self, body: any) -> Dict[str, any]:
def _process_response(self, res: Response):
res.raise_for_status()
json_res = res.json()

if json_res.get("code") != self.success_code:
error = json_res.get("error", DEFAULT_ERROR) or DEFAULT_ERROR
raise Exception(error)

data = json_res.get("data")

if data is None:
raise Exception(DEFAULT_ERROR)

if data and not isinstance(data, list) and data.get("err_msg"):
raise Exception(data.get("err_msg", DEFAULT_ERROR))

return json_res

def post(self, body: any, url: Optional[str] = None) -> Dict[str, any]:
"""
Sends a POST request to the Chainbase API with the provided body.
Args:
body (Any): The payload to be sent in the POST request.
url (str): If we want to call a different url
Returns:
Dict[str, Any]: The JSON response from the API.
Expand All @@ -46,20 +68,24 @@ def post(self, body: any) -> Dict[str, any]:
Exception: If the API returns an error code or if there's an issue with
the response data structure.
"""
res = requests.post(self.url, json=body, headers=self.headers)
res.raise_for_status()
json_res = res.json()
url = url or self.url
res = requests.post(url, json=body, headers=self.headers)
return self._process_response(res)

if json_res.get("code") != 0:
error = json_res.get("error", DEFAULT_ERROR) or DEFAULT_ERROR
raise Exception(error)

data = json_res.get("data")
def get(self, url: Optional[str]) -> Dict[str, any]:
"""
Sends a GET request to the Chainbase API.
if data is None:
raise Exception(DEFAULT_ERROR)
Args:
url (str): If we want to call a different url
if data and data.get("err_msg"):
raise Exception(data.get("err_msg", DEFAULT_ERROR))
Returns:
Dict[str, Any]: The JSON response from the API.
return json_res
Raises:
Exception: If the API returns an error code or if there's an issue with
the response data structure.
"""
url = url or self.url
res = requests.get(url, headers=self.headers)
return self._process_response(res)
2 changes: 1 addition & 1 deletion src/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def __init__(self, api_key: str):
Args:
api_key (str): The API key for authenticating with the Chainbase API.
"""
super().__init__("https://api.chainbase.online/v1/dw/query", api_key)
super().__init__("https://api.chainbase.online/v1/dw/query", api_key, 0)

def _get_all_pages(
self, body, results: Optional[List[Dict[str, str]]] = None
Expand Down
150 changes: 150 additions & 0 deletions src/sql_alpha.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
import time
from typing import Dict, List, Tuple

import pandas as pd

from src.chainbase_api import ChainbaseAPI

MAP_TYPES = {
"bigint": "int64",
"varchar": "object",
"timestamp": "datetime64[ns]",
"integer": "object",
}


class ChainbaseSQLAlpha(ChainbaseAPI):
"""
Extension of the ChainbaseAPI class, specifically for handling SQL queries
and returning results either as raw data or as a pandas DataFrame.
Inherits from ChainbaseAPI and uses its methods for making HTTP requests to
the Chainbase API.
Methods:
query: Executes an SQL query and returns metadata and results.
query_pandas: Executes an SQL query and returns the results as a pandas DataFrame.
"""

def __init__(self, api_key: str):
"""
Initializes the ChainbaseSQL client with the provided API key.
Args:
api_key (str): The API key for authenticating with the Chainbase API.
"""
super().__init__("https://api.chainbase.com/api/v1/query/execute", api_key, 200)

def _execute(self, sql: str) -> str:
"""
Internal method to initiate a query execution
Args:
sql: The query sql to execute.
Returns:
The execution id
"""
return self.post({"sql": sql})["data"][0]["executionId"]

def _check_status(self, execution_id: str) -> str:
"""
Internal method to check the status of a query execution
Args:
execution_id: The query execution id.
Returns:
True if it is done, False if it is not
"""
url = f"https://api.chainbase.com/api/v1/execution/{execution_id}/status"
data = self.get(url)["data"][0]
status = data["status"]

if status == "FAILED":
raise Exception(data["message"])

return status == "FINISHED"

def _get_results(self, execution_id: str) -> str:
"""
Internal method to check the status of a query execution
Args:
execution_id: The query execution id.
Returns:
A tuple containing query metadata and results as a list of dictionaries.
"""
url = f"https://api.chainbase.com/api/v1/execution/{execution_id}/results"
res = self.get(url)["data"]
return res["columns"], res["data"]

def query(self, sql: str) -> Tuple[Dict[str, str], List[Dict[str, str]]]:
"""
Executes an SQL query against the Chainbase API.
Args:
sql (str): The SQL query to be executed.
Returns:
A tuple containing query metadata and results as a list of dictionaries.
"""
execution_id = self._execute(sql)

max_status_checks = 10
i = 0
while i < max_status_checks:
if self._check_status(execution_id):
break
else:
time.sleep(1)
i += 1

if i >= max_status_checks:
raise Exception("Max retries reached.")

return self._get_results(execution_id)

def query_pandas(self, sql: str) -> pd.DataFrame:
"""
Executes an SQL query and returns the results as a pandas DataFrame.
Args:
sql (str): The SQL query to be executed.
Returns:
pandas DataFrame containing the query results.
"""
metadata, results = self.query(sql)

types = self._metadata_to_pandas_types(metadata)

# return pd.DataFrame(results, columns=types.keys()).astype(types)
return pd.DataFrame(results, columns=types.keys())

@staticmethod
def _metadata_to_pandas_types(metadata: Dict[str, str]):
"""
Converts query metadata to pandas DataFrame types.
Args:
metadata (Dict[str, str]): Metadata about the columns in the query result.
Returns:
A dictionary mapping column names to pandas data types.
"""
return {
m["name"]: MAP_TYPES.get(ChainbaseSQLAlpha._get_type(m["type"]), None)
for m in metadata
}

@staticmethod
def _get_type(t: str):
if t.startswith("varchar"):
return "varchar"

if t not in MAP_TYPES:
print("NOT EXISTS: ", t)

return t
2 changes: 1 addition & 1 deletion tests/e2e/tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,10 @@
"nft.nft_collections",
"nft.nft_mints",
"nft.nft_transactions",
"bitcoin.transactions",
"bitcoin.blocks",
"bitcoin.inputs",
"bitcoin.outputs",
"bitcoin.transactions",
"base.blocks",
"base.contracts",
"base.token_metas",
Expand Down

0 comments on commit 3b28247

Please sign in to comment.