Skip to content

Commit

Permalink
35 create a ticker abstraction, caching and retries (#36)
Browse files Browse the repository at this point in the history
* added helper methods to get access to data from alpha vantage

* adding ticker abstraction to help with retrieving data from alpha vantage

* adding tests to verify a single client or creating multiple clients will pass all our integration tests

* fixed typo in client

* added tests for accounting reports. added logic to correlate accounting reports

* added cash flow to accounting reports

* removed initializing dict in method signature

* added fixed for real_gdp(event). It was defining {} in the signature and this can lead to unexpected results. thus the tests had to change slightly

* added markers for new ticker integration tests

* added limit to cache and made it configurable. renamed use_cache() to use_simple_cache() so it's clear to devs that it's a simple cache strategy.
  • Loading branch information
xrgarcia authored Jul 13, 2022
1 parent 4a56ae0 commit f2b4d54
Show file tree
Hide file tree
Showing 10 changed files with 1,063 additions and 110 deletions.
1 change: 1 addition & 0 deletions alphavantage_api_client/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from alphavantage_api_client.client import AlphavantageClient
from alphavantage_api_client.models import GlobalQuote, Quote, AccountingReport, CompanyOverview, RealGDP, \
CsvNotSupported
from alphavantage_api_client.ticker import Ticker
178 changes: 148 additions & 30 deletions alphavantage_api_client/client.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import time

import requests
import os
import configparser
Expand All @@ -7,6 +9,9 @@
CsvNotSupported
import copy
import logging
import hashlib
from typing import Optional


class ApiKeyNotFound(Exception):

Expand All @@ -16,6 +21,12 @@ def __init__(self, message: str):

class AlphavantageClient:
def __init__(self):
self.__total_calls__ = 0
self.__max_cache_size__ = 0
self.__retry__ = False
self.__first_successful_attempt__ = 0
self.__use_cache__ = False
self.__cache__ = {}
# try to get api key from USER_PROFILE/.alphavantage
alphavantage_config_file_path = f'{os.path.expanduser("~")}{os.path.sep}.alphavantage'
msg = {"method": "__init__", "action": f"{alphavantage_config_file_path} config file found"}
Expand Down Expand Up @@ -76,11 +87,45 @@ def __create_api_request_from__(self, defaults: dict, event: dict):
Returns:
:rtype: dict
"""
json_request = event.copy()
if event is not None:
json_request = event.copy()
else:
json_request = {}
self.__inject_values__(defaults, json_request)

return json_request

def should_retry_once(self, retry: bool = True):
self.__retry__ = retry

return self

def use_simple_cache(self, use_cache: bool = True, max_cache_size: int = 100):
"""
First in First Out Cache
Args:
use_cache:
max_cache_size: Max size of the cache.
Returns:
"""
self.__use_cache__ = use_cache
self.__max_cache_size__ = max_cache_size

return self

def get_internal_metrics(self) -> dict:
total_calls = self.__total_calls__
retry = self.__retry__
first_successful_attempt = self.__first_successful_attempt__
metrics = {
"total_calls": total_calls,
"retry": retry,
"first_successful_attempt": first_successful_attempt
}
return metrics

def with_api_key(self, api_key: str):
"""Specify the API Key when you are storing it somewhere other than in ini file or environment variable
Expand Down Expand Up @@ -114,7 +159,7 @@ def get_global_quote(self, event: dict) -> GlobalQuote:
"function": "GLOBAL_QUOTE"
}
json_request = self.__create_api_request_from__(defaults, event)
json_response = self.get_data_from_alpha_vantage(json_request)
json_response = self.get_data_from_alpha_vantage(json_request, self.__retry__)

return GlobalQuote.parse_obj(json_response)

Expand All @@ -140,7 +185,7 @@ def get_intraday_quote(self, event: dict) -> Quote:
"interval": "60min", "slice": "year1month1",
"outputsize": "compact"}
json_request = self.__create_api_request_from__(defaults, event)
json_response = self.get_data_from_alpha_vantage(json_request)
json_response = self.get_data_from_alpha_vantage(json_request, self.__retry__)

return Quote.parse_obj(json_response)

Expand All @@ -166,7 +211,19 @@ def get_income_statement(self, event: dict) -> AccountingReport:
if event.get("datatype") == "csv":
raise CsvNotSupported(defaults.get("function"), event)
json_request = self.__create_api_request_from__(defaults, event)
json_response = self.get_data_from_alpha_vantage(json_request)
json_response = self.get_data_from_alpha_vantage(json_request, self.__retry__)

return AccountingReport.parse_obj(json_response)

def get_balance_sheet(self, event: dict) -> AccountingReport:
defaults = {
"function": "BALANCE_SHEET",
"datatype": "json"
}
if event.get("datatype") == "csv":
raise CsvNotSupported(defaults.get("function"), event)
json_request = self.__create_api_request_from__(defaults, event)
json_response = self.get_data_from_alpha_vantage(json_request, self.__retry__)

return AccountingReport.parse_obj(json_response)

Expand All @@ -190,7 +247,7 @@ def get_cash_flow(self, event: dict) -> AccountingReport:
if event.get("datatype") == "csv":
raise CsvNotSupported(defaults.get("function"), event)
json_request = self.__create_api_request_from__(defaults, event)
json_response = self.get_data_from_alpha_vantage(json_request)
json_response = self.get_data_from_alpha_vantage(json_request, self.__retry__)

return AccountingReport.parse_obj(json_response)

Expand All @@ -214,7 +271,7 @@ def get_earnings(self, event: dict) -> AccountingReport:
if event.get("datatype") == "csv":
raise CsvNotSupported(defaults.get("function"), event)
json_request = self.__create_api_request_from__(defaults, event)
json_response = self.get_data_from_alpha_vantage(json_request)
json_response = self.get_data_from_alpha_vantage(json_request, self.__retry__)

return AccountingReport.parse_obj(json_response)

Expand All @@ -237,7 +294,7 @@ def get_company_overview(self, event: dict) -> CompanyOverview:
if event.get("datatype") == "csv":
raise CsvNotSupported(defaults.get("function"), event)
json_request = self.__create_api_request_from__(defaults, event)
json_response = self.get_data_from_alpha_vantage(json_request)
json_response = self.get_data_from_alpha_vantage(json_request, self.__retry__)

return CompanyOverview.parse_obj(json_response)

Expand All @@ -261,11 +318,11 @@ def get_crypto_intraday(self, event: dict) -> Quote:
"outputsize": "compact"
}
json_request = self.__create_api_request_from__(defaults, event)
json_response = self.get_data_from_alpha_vantage(json_request)
json_response = self.get_data_from_alpha_vantage(json_request, self.__retry__)

return Quote.parse_obj(json_response)

def get_real_gdp(self, event: dict={}) -> RealGDP:
def get_real_gdp(self, event: dict = None) -> RealGDP:
"""
This API returns the annual and quarterly Real GDP of the United States.
Expand All @@ -283,7 +340,7 @@ def get_real_gdp(self, event: dict={}) -> RealGDP:
"datatype": "json"
}
json_request = self.__create_api_request_from__(defaults, event)
json_response = self.get_data_from_alpha_vantage(json_request)
json_response = self.get_data_from_alpha_vantage(json_request, self.__retry__)

return RealGDP.parse_obj(json_response)

Expand All @@ -304,12 +361,12 @@ def get_technical_indicator(self, event: dict) -> Quote:
"datatype": "json"
}
json_request = self.__create_api_request_from__(defaults, event)
json_response = self.get_data_from_alpha_vantage(json_request)
json_response = self.get_data_from_alpha_vantage(json_request, self.__retry__)
json_response["indicator"] = event.get("function")

return Quote.parse_obj(json_response)

def get_data_from_alpha_vantage(self, event: dict) -> dict:
def get_data_from_alpha_vantage(self, event: dict, should_retry: bool = False) -> dict:
"""
This is the underlying function that talks to alphavantage api. Feel free to pass in any parameters supported
by the api. You will receive a dictionary with the response from the web api. In addition, you will obtain
Expand All @@ -321,26 +378,64 @@ def get_data_from_alpha_vantage(self, event: dict) -> dict:
:rtype: dict
"""
# validate api key and insert into the request if needed
checks = ValidationRuleChecks().from_customer_request(event)
# get api key if not provided
if checks.expect_api_key_in_event().failed():
event["apikey"] = self.__api_key__ # assume they passed to builder method.
elif self.__api_key__ is None or len(self.__api_key__) == 0 or "apikey" not in event \
or not event.get("apikey"): # consumer didn't tell me where to get api key
raise ApiKeyNotFound(
"You must call client.with_api_key([api_key]), create config file in your profile (i.e. ~/.alphavantage) or event[api_key] = [your api key] before retrieving data from alphavantage")
self.__validate_api_key__(checks, event)

# create a version of the event without api key
loggable_event = copy.deepcopy(event)
loggable_event.pop("apikey")

# check cache if allowed
if self.__use_cache__:
results = self.__get_item_from_cache__(loggable_event)
logging.info(f"Found item in cache: {results}")
if results is not None:
return results

# fetch data from API
url = self.__build_url_from_args__(event)
r = requests.get(url)
checks.with_response(r)
self.__fetch_data__(checks, event, loggable_event)
requested_data = {}
logging.info(json.dumps({"method": "get_data_from_alpha_vantage", "action": "response_from_alphavantage"
, "status_code": r.status_code, "data": r.text, "event": loggable_event}))

# hydrate the response
self.__hydrate_request__(requested_data, checks, event, should_retry)

# retry once if allowed and needed
if checks.expect_limit_not_reached().passed() and should_retry:
self.__sleep__()
result = self.get_data_from_alpha_vantage(event, False)
self.__first_successful_attempt__ = time.perf_counter()
return result

# not all calls will have a symbol in the call to alphavantage.... if so we can to capture it.
if "symbol" in event:
requested_data['symbol'] = event['symbol']

# put into cache if allowed
if self.__use_cache__:
self.__put_item_into_cache(loggable_event, requested_data)

logging.info(json.dumps({"method": "get_data_from_alpha_vantage"
, "action": "return_value", "data": requested_data, "event": loggable_event}))

return requested_data

def __put_item_into_cache(self, event, results):

if len(self.__cache__) >= self.__max_cache_size__:
self.__cache__.clear()

hash_str = json.dumps(event, sort_keys=True)
self.__cache__[hash_str] = results

def __get_item_from_cache__(self, event):
hash_str = json.dumps(event, sort_keys=True)
if hash_str in self.__cache__:
return self.__cache__[hash_str]

return None

def __hydrate_request__(self, requested_data: dict, checks: ValidationRuleChecks, event: dict, should_retry: bool):
# verify request worked correctly and build response
# gotta check if consumer request json or csv, so we can parse the output correctly
requested_data['success'] = checks.expect_successful_response().passed() # successful csv response
Expand All @@ -357,10 +452,33 @@ def get_data_from_alpha_vantage(self, event: dict) -> dict:
if checks.expect_csv_datatype().expect_successful_response().passed(): # successful csv response
requested_data['csv'] = checks.get_obj()

# not all calls will have symbol in the call to alphavantage.... if so we can to capture it.
if "symbol" in event:
requested_data['symbol'] = event['symbol']
logging.info(json.dumps({"method": "get_data_from_alpha_vantage"
, "action": "return_value", "data": requested_data, "event": loggable_event}))
def __fetch_data__(self, checks: ValidationRuleChecks, event: dict, loggable_event: dict):
url = self.__build_url_from_args__(event)
r = requests.get(url)
if self.__first_successful_attempt__ == 0:
self.__first_successful_attempt__ = time.perf_counter()
self.__total_calls__ += 1
checks.with_response(r)
logging.info(json.dumps({"method": "get_data_from_alpha_vantage", "action": "response_from_alphavantage"
, "status_code": r.status_code, "data": r.text, "event": loggable_event}))

return requested_data
def __validate_api_key__(self, checks: ValidationRuleChecks, event: dict):
# get api key if not provided
if checks.expect_api_key_in_event().failed():
event["apikey"] = self.__api_key__ # assume they passed to builder method.
elif self.__api_key__ is None or len(self.__api_key__) == 0 or "apikey" not in event \
or not event.get("apikey"): # consumer didn't tell me where to get api key
raise ApiKeyNotFound(
"You must call client.with_api_key([api_key]), create config file in your profile (i.e. ~/.alphavantage) or event[api_key] = [your api key] before retrieving data from alphavantage")

def __sleep__(self):
then = self.__first_successful_attempt__
now = time.perf_counter()
diff = 60 - (now - then)
logging.info(f"sleeping for {diff} seconds")
time.sleep(diff)

def clear_cache(self):
self.__cache__.clear()

return self
77 changes: 77 additions & 0 deletions alphavantage_api_client/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,60 @@ def normalize_fields(cls, values):
or k.startswith("Time Series Crypto (") else k: v for k, v in values.items()
}

def get_most_recent_value(self) -> Optional[dict]:
if len(self.data) > 0:
for quote_date in self.data:
quotes = self.data[quote_date]
quotes["query_date"] = quote_date
return quotes

return None


class GlobalQuote(BaseQuote):
data: dict = Field({}, alias='Global Quote')

def get_open_price(self) -> str:
field = "02. open"
return self.get_data_value(field)

def get_data_value(self, field) -> str:
if field in self.data:
return self.data[field]
return None

def get_high_price(self) -> str:
field = "03. high"
return self.get_data_value(field)

def get_low_price(self) -> str:
field = "04. low"
return self.get_data_value(field)

def get_price(self) -> str:
field = "05. price"
return self.get_data_value(field)

def get_volume(self) -> str:
field = "06. volume"
return self.get_data_value(field)

def get_latest_trading_day(self) -> str:
field = "07. latest trading day"
return self.get_data_value(field)

def get_previous_close_day(self) -> str:
field = "08. previous close"
return self.get_data_value(field)

def get_change_in_dollars(self) -> str:
field = "09. change"
return self.get_data_value(field)

def get_change_percent(self) -> str:
field = "10. change percent"
return self.get_data_value(field)


class AccountingReport(BaseQuote):
annualReports: list = Field(default=[], alias="annualReports")
Expand All @@ -61,6 +112,20 @@ def normalize_fields(cls, values):
values.pop(field)
return values

def get_most_recent_annual_report(self) -> Optional[dict]:
if len(self.annualReports) > 0:
for index, annual_report in enumerate(self.annualReports):
return annual_report

return None

def get_most_recent_quarterly_report(self) -> Optional[dict]:
if len(self.quarterlyReports) > 0:
for index, quarterly_report in enumerate(self.quarterlyReports):
return quarterly_report

return None


class RealGDP(BaseResponse):
name: Optional[str]
Expand Down Expand Up @@ -116,3 +181,15 @@ class CompanyOverview(BaseQuote):
shares_outstanding: str = Field(default=None, alias='SharesOutstanding')
dividend_date: str = Field(default=None, alias='DividendDate')
ex_dividend_date: str = Field(default=None, alias='ExDividendDate')

def get_ex_dividend_date(self):
"""
Alpha vantage api will return 'None' when there isn't an ex dividend date. This will return None or an ex
dividend date.
Returns:
"""
if self.ex_dividend_date is None or len(self.ex_dividend_date) == 0 or "None" == self.ex_dividend_date:
return None

return self.ex_dividend_date
Loading

0 comments on commit f2b4d54

Please sign in to comment.