From 3b28247580feeb64dba1813a8161d87cd538d4b8 Mon Sep 17 00:00:00 2001 From: panosdb Date: Fri, 18 Oct 2024 10:56:07 +0300 Subject: [PATCH] Add alpha API --- src/chainbase.py | 2 + src/chainbase_api.py | 58 ++++++++++++----- src/sql.py | 2 +- src/sql_alpha.py | 150 +++++++++++++++++++++++++++++++++++++++++++ tests/e2e/tables.py | 2 +- 5 files changed, 196 insertions(+), 18 deletions(-) create mode 100644 src/sql_alpha.py diff --git a/src/chainbase.py b/src/chainbase.py index ea92358..ff4b5a8 100644 --- a/src/chainbase.py +++ b/src/chainbase.py @@ -2,6 +2,7 @@ from typing import Optional from src.sql import ChainbaseSQL +from src.sql_alpha import ChainbaseSQLAlpha MISSING_API_KEY_ERROR = """ @@ -50,3 +51,4 @@ def __init__(self, api_key: Optional[str] = None): raise ValueError(MISSING_API_KEY_ERROR) self.sql = ChainbaseSQL(api_key) + self.sql_alpha = ChainbaseSQLAlpha(api_key) diff --git a/src/chainbase_api.py b/src/chainbase_api.py index 3326e71..e41b953 100644 --- a/src/chainbase_api.py +++ b/src/chainbase_api.py @@ -1,6 +1,7 @@ -from typing import Dict +from typing import Dict, Optional import requests +from requests import Response DEFAULT_ERROR = "An unexpected error occurred while calling the Chainbase API." @@ -17,27 +18,48 @@ class ChainbaseAPI: headers (dict): HTTP headers to include with requests, including the API key. """ - def __init__(self, url: str, api_key: str): + def __init__(self, url: str, api_key: str, success_code: int = 0): """ Initializes the ChainbaseAPI client with the specified API URL and key. Args: url (str): The base URL for the Chainbase API. api_key (str): The API key for authenticating with the Chainbase API. + success_code (int): The response success code, defaults to 0. """ self.url = url + self.success_code = success_code self.headers = { "x-api-key": api_key, "Content-Type": "application/json; charset=utf-8", } - def post(self, body: any) -> Dict[str, any]: + def _process_response(self, res: Response): + res.raise_for_status() + json_res = res.json() + + if json_res.get("code") != self.success_code: + error = json_res.get("error", DEFAULT_ERROR) or DEFAULT_ERROR + raise Exception(error) + + data = json_res.get("data") + + if data is None: + raise Exception(DEFAULT_ERROR) + + if data and not isinstance(data, list) and data.get("err_msg"): + raise Exception(data.get("err_msg", DEFAULT_ERROR)) + + return json_res + + def post(self, body: any, url: Optional[str] = None) -> Dict[str, any]: """ Sends a POST request to the Chainbase API with the provided body. Args: body (Any): The payload to be sent in the POST request. + url (str): If we want to call a different url Returns: Dict[str, Any]: The JSON response from the API. @@ -46,20 +68,24 @@ def post(self, body: any) -> Dict[str, any]: Exception: If the API returns an error code or if there's an issue with the response data structure. """ - res = requests.post(self.url, json=body, headers=self.headers) - res.raise_for_status() - json_res = res.json() + url = url or self.url + res = requests.post(url, json=body, headers=self.headers) + return self._process_response(res) - if json_res.get("code") != 0: - error = json_res.get("error", DEFAULT_ERROR) or DEFAULT_ERROR - raise Exception(error) - - data = json_res.get("data") + def get(self, url: Optional[str]) -> Dict[str, any]: + """ + Sends a GET request to the Chainbase API. - if data is None: - raise Exception(DEFAULT_ERROR) + Args: + url (str): If we want to call a different url - if data and data.get("err_msg"): - raise Exception(data.get("err_msg", DEFAULT_ERROR)) + Returns: + Dict[str, Any]: The JSON response from the API. - return json_res + Raises: + Exception: If the API returns an error code or if there's an issue with + the response data structure. + """ + url = url or self.url + res = requests.get(url, headers=self.headers) + return self._process_response(res) diff --git a/src/sql.py b/src/sql.py index 9f8ec8e..1ae4e8c 100644 --- a/src/sql.py +++ b/src/sql.py @@ -51,7 +51,7 @@ def __init__(self, api_key: str): Args: api_key (str): The API key for authenticating with the Chainbase API. """ - super().__init__("https://api.chainbase.online/v1/dw/query", api_key) + super().__init__("https://api.chainbase.online/v1/dw/query", api_key, 0) def _get_all_pages( self, body, results: Optional[List[Dict[str, str]]] = None diff --git a/src/sql_alpha.py b/src/sql_alpha.py new file mode 100644 index 0000000..04990fa --- /dev/null +++ b/src/sql_alpha.py @@ -0,0 +1,150 @@ +import time +from typing import Dict, List, Tuple + +import pandas as pd + +from src.chainbase_api import ChainbaseAPI + +MAP_TYPES = { + "bigint": "int64", + "varchar": "object", + "timestamp": "datetime64[ns]", + "integer": "object", +} + + +class ChainbaseSQLAlpha(ChainbaseAPI): + """ + Extension of the ChainbaseAPI class, specifically for handling SQL queries + and returning results either as raw data or as a pandas DataFrame. + + Inherits from ChainbaseAPI and uses its methods for making HTTP requests to + the Chainbase API. + + Methods: + query: Executes an SQL query and returns metadata and results. + query_pandas: Executes an SQL query and returns the results as a pandas DataFrame. + """ + + def __init__(self, api_key: str): + """ + Initializes the ChainbaseSQL client with the provided API key. + + Args: + api_key (str): The API key for authenticating with the Chainbase API. + """ + super().__init__("https://api.chainbase.com/api/v1/query/execute", api_key, 200) + + def _execute(self, sql: str) -> str: + """ + Internal method to initiate a query execution + + Args: + sql: The query sql to execute. + + Returns: + The execution id + """ + return self.post({"sql": sql})["data"][0]["executionId"] + + def _check_status(self, execution_id: str) -> str: + """ + Internal method to check the status of a query execution + + Args: + execution_id: The query execution id. + + Returns: + True if it is done, False if it is not + """ + url = f"https://api.chainbase.com/api/v1/execution/{execution_id}/status" + data = self.get(url)["data"][0] + status = data["status"] + + if status == "FAILED": + raise Exception(data["message"]) + + return status == "FINISHED" + + def _get_results(self, execution_id: str) -> str: + """ + Internal method to check the status of a query execution + + Args: + execution_id: The query execution id. + + Returns: + A tuple containing query metadata and results as a list of dictionaries. + """ + url = f"https://api.chainbase.com/api/v1/execution/{execution_id}/results" + res = self.get(url)["data"] + return res["columns"], res["data"] + + def query(self, sql: str) -> Tuple[Dict[str, str], List[Dict[str, str]]]: + """ + Executes an SQL query against the Chainbase API. + + Args: + sql (str): The SQL query to be executed. + + Returns: + A tuple containing query metadata and results as a list of dictionaries. + """ + execution_id = self._execute(sql) + + max_status_checks = 10 + i = 0 + while i < max_status_checks: + if self._check_status(execution_id): + break + else: + time.sleep(1) + i += 1 + + if i >= max_status_checks: + raise Exception("Max retries reached.") + + return self._get_results(execution_id) + + def query_pandas(self, sql: str) -> pd.DataFrame: + """ + Executes an SQL query and returns the results as a pandas DataFrame. + + Args: + sql (str): The SQL query to be executed. + + Returns: + pandas DataFrame containing the query results. + """ + metadata, results = self.query(sql) + + types = self._metadata_to_pandas_types(metadata) + + # return pd.DataFrame(results, columns=types.keys()).astype(types) + return pd.DataFrame(results, columns=types.keys()) + + @staticmethod + def _metadata_to_pandas_types(metadata: Dict[str, str]): + """ + Converts query metadata to pandas DataFrame types. + + Args: + metadata (Dict[str, str]): Metadata about the columns in the query result. + + Returns: + A dictionary mapping column names to pandas data types. + """ + return { + m["name"]: MAP_TYPES.get(ChainbaseSQLAlpha._get_type(m["type"]), None) + for m in metadata + } + + @staticmethod + def _get_type(t: str): + if t.startswith("varchar"): + return "varchar" + + if t not in MAP_TYPES: + print("NOT EXISTS: ", t) + + return t diff --git a/tests/e2e/tables.py b/tests/e2e/tables.py index 48182b0..3ecbdec 100644 --- a/tests/e2e/tables.py +++ b/tests/e2e/tables.py @@ -81,10 +81,10 @@ "nft.nft_collections", "nft.nft_mints", "nft.nft_transactions", + "bitcoin.transactions", "bitcoin.blocks", "bitcoin.inputs", "bitcoin.outputs", - "bitcoin.transactions", "base.blocks", "base.contracts", "base.token_metas",