Skip to content
This repository has been archived by the owner on Mar 12, 2021. It is now read-only.

Commit

Permalink
removed inner wrapper on polgonaggs
Browse files Browse the repository at this point in the history
  • Loading branch information
RileyMShea committed Feb 12, 2021
1 parent e9028ec commit 37fa104
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 142 deletions.
6 changes: 6 additions & 0 deletions examples/basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@
end = pd.Timestamp("2020-01-01", tz=NY)
aapl = async_polygon_aggs("GOOG", start, end, debug_mode=True)
aapl
#%%
# period exceeding stock history
start = pd.Timestamp("2012-06-01", tz=NY)
end = pd.Timestamp("2018-01-01", tz=NY)
shop = async_polygon_aggs("SHOP", start, end, debug_mode=True)
shop
#%% Check holiday close has a 12:59 bar
aapl.loc["2019-07-03 12"].tail()
#%%
Expand Down
238 changes: 96 additions & 142 deletions upoly/polygon_plus.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ def combine_chunks(chunks: List[bytes]) -> Iterator[PolyAggResponse]:
yield orjson.loads(chunk)


@typed_cache
def async_polygon_aggs(
symbol: str,
start: pd.Timestamp,
Expand Down Expand Up @@ -187,176 +188,129 @@ def async_polygon_aggs(
>>> df = async_polygon_aggs("AAPL", start, end)
"""

def inner_wrapper(
symbol: str,
start: pd.Timestamp,
end: pd.Timestamp,
/,
timespan: Literal["minute", "hour", "day"] = "minute",
interval: int = 1,
unadjusted: bool = False,
max_chunk_days: int = 100,
debug_mode: bool = False,
paucity_threshold: float = 0.70,
df_dtype: Literal["np.float64", "np.float32"] = "np.float64",
) -> Optional[pd.DataFrame]:
"""This duplicated function of the outer signature exists b/c joblib's
implementation of it's memory.cache decorator breaks editor intellisense."""

if start.tz.zone != "America/New_York" or end.tz.zone != "America/New_York":
raise ValueError(
"start and end time must be a NYS, timezone-aware, pandas Timestamp"
)

periods = ceil((end - start).days / max_chunk_days)
if start.tz.zone != "America/New_York" or end.tz.zone != "America/New_York":
raise ValueError(
"start and end time must be a NYS, timezone-aware, pandas Timestamp"
)

intervals: Tuple[pd.Timestamp, pd.Timestamp] = pd.interval_range(
start=start, end=end + timedelta(days=1), periods=periods
).to_tuples()
periods = ceil((end - start).days / max_chunk_days)

print(f"Retrieving {periods} mini-batches for {symbol}...")
intervals: Tuple[pd.Timestamp, pd.Timestamp] = pd.interval_range(
start=start, end=end + timedelta(days=1), periods=periods
).to_tuples()

network_io_start = perf_counter()
print(f"Retrieving {periods} mini-batches for {symbol}...")

raw_results = unwrap(
asyncio.run(
_dispatch_consume_polygon(
intervals, symbol, timespan, interval, unadjusted
)
)
)
network_io_stop = perf_counter()
print(f"Data retrieved in {network_io_stop-network_io_start:.2f} seconds.")
print("Performing Transforms...")

df: pd.DataFrame = pd.concat(
(
pd.DataFrame(orjson.loads(result_chunk).get("results", None))
for result_chunk in raw_results
),
ignore_index=True,
)
network_io_start = perf_counter()

nyse: NYSEExchangeCalendar = mcal.get_calendar("NYSE")
schedule = nyse.schedule(start, end)
valid_minutes: pd.DatetimeIndex = mcal.date_range(schedule, "1min") - timedelta(
minutes=1
raw_results = unwrap(
asyncio.run(
_dispatch_consume_polygon(intervals, symbol, timespan, interval, unadjusted)
)
)
network_io_stop = perf_counter()
print(f"Data retrieved in {network_io_stop-network_io_start:.2f} seconds.")
print("Performing Transforms...")

df: pd.DataFrame = pd.concat(
(
pd.DataFrame(orjson.loads(result_chunk).get("results", None))
for result_chunk in raw_results
),
ignore_index=True,
)

if df is None or df.empty:
print(f"No results for {symbol}.")
return None
df.t = pd.to_datetime(df.t.astype(int), unit="ms", utc=True)
df.set_index("t", inplace=True)
nyse: NYSEExchangeCalendar = mcal.get_calendar("NYSE")
schedule = nyse.schedule(start, end)
valid_minutes: pd.DatetimeIndex = mcal.date_range(schedule, "1min") - timedelta(
minutes=1
)

expected_sessions = schedule.shape[0]
if df is None or df.empty:
print(f"No results for {symbol}.")
return None
df.t = pd.to_datetime(df.t.astype(int), unit="ms", utc=True)
df.set_index("t", inplace=True)

actual_sessions = df.groupby(df.index.date).count().shape[0] # type:ignore
expected_sessions = schedule.shape[0]

print(
f"{expected_sessions=}\t{actual_sessions=}\tpct_diff: {(actual_sessions/expected_sessions)-1.:+.2%}"
)
actual_sessions = df.groupby(df.index.date).count().shape[0] # type:ignore

expected_minutes = valid_minutes.shape[0]
print(
f"{expected_sessions=}\t{actual_sessions=}\tpct_diff: {(actual_sessions/expected_sessions)-1.:+.2%}"
)

actual_minutes = df.loc[~df.isnull().all(axis="columns")].shape[0]
expected_minutes = valid_minutes.shape[0]

print(
f"{expected_minutes=}\t{actual_minutes=}\tpct_diff: {(actual_minutes/expected_minutes)-1.:+.2%}"
)
actual_minutes = df.loc[~df.isnull().all(axis="columns")].shape[0]

if (duplicated_indice_count := df.index.duplicated().sum()) > 0:
print("\n")
print(f"Found the following duplicated indexes:")
print(df[df.index.duplicated(keep=False)].sort_index()["vw"]) # type: ignore
print(
f"Dropping {duplicated_indice_count} row(s) w/ duplicate Datetimeindex "
)
df = df[~df.index.duplicated()] # type: ignore
print(
f"{expected_minutes=}\t{actual_minutes=}\tpct_diff: {(actual_minutes/expected_minutes)-1.:+.2%}"
)

print("Reindexing...")
df = df.reindex(valid_minutes) # type: ignore
if (duplicated_indice_count := df.index.duplicated().sum()) > 0:
print("\n")
print(f"Found the following duplicated indexes:")
print(df[df.index.duplicated(keep=False)].sort_index()["vw"]) # type: ignore
print(f"Dropping {duplicated_indice_count} row(s) w/ duplicate Datetimeindex ")
df = df[~df.index.duplicated()] # type: ignore

print("After Reindexing by trading calender:")
print("Reindexing...")
df = df.reindex(valid_minutes) # type: ignore

actual_sessions = df.groupby(df.index.date).count().shape[0] # type:ignore
actual_minutes = df.loc[~df.isnull().all(axis="columns")].shape[0]
print("After Reindexing by trading calender:")

print(f"{expected_sessions = }\t{actual_sessions = }")
print(f"{expected_minutes = }\t{actual_minutes = }")
actual_sessions = df.groupby(df.index.date).count().shape[0] # type:ignore
actual_minutes = df.loc[~df.isnull().all(axis="columns")].shape[0]

pct_minutes_not_null = actual_minutes / expected_minutes
print(f"{expected_sessions = }\t{actual_sessions = }")
print(f"{expected_minutes = }\t{actual_minutes = }")

print(f"{pct_minutes_not_null = :.3%}")
pct_minutes_not_null = actual_minutes / expected_minutes

if pct_minutes_not_null < paucity_threshold:
print(f"{symbol} below threshold: {paucity_threshold}")
return None
print(f"{pct_minutes_not_null = :.3%}")

if pct_minutes_not_null > 1.01:
raise ValueError(f"{pct_minutes_not_null=} Riley messed up, yell at him")
if pct_minutes_not_null < paucity_threshold:
print(f"{symbol} below threshold: {paucity_threshold}")
return None

# Rename polygon json keys to canonical pandas headers
df = df.rename(
columns={
"v": "volume",
"vw": "volwavg",
"o": "open",
"c": "close",
"h": "high",
"l": "low",
"t": "date",
"n": "trades",
}
)
# Converting UTC-aware timestamps to NY-naive
if isinstance(df.index, pd.DatetimeIndex):
df.index = df.index.tz_convert(NY).tz_localize(None)
else:
raise TypeError
if pct_minutes_not_null > 1.01:
raise ValueError(f"{pct_minutes_not_null=} Riley messed up, yell at him")

# Rename polygon json keys to canonical pandas headers
df = df.rename(
columns={
"v": "volume",
"vw": "volwavg",
"o": "open",
"c": "close",
"h": "high",
"l": "low",
"t": "date",
"n": "trades",
}
)
# Converting UTC-aware timestamps to NY-naive
if isinstance(df.index, pd.DatetimeIndex):
df.index = df.index.tz_convert(NY).tz_localize(None)
else:
raise TypeError

# Force columns to specific type
# df = df.astype(np.float64)
# Force columns to specific type
# df = df.astype(np.float64)

# Reorder columns so that ohlcv comes first
df = df[["open", "high", "low", "close", "volume", "volwavg", "trades"]]
df.index.name = "time"
# Reorder columns so that ohlcv comes first
df = df[["open", "high", "low", "close", "volume", "volwavg", "trades"]]
df.index.name = "time"

first_index = df.index.min()
last_index = df.index.max()
first_index = df.index.min()
last_index = df.index.max()

print(f"{first_index = } {last_index = }")
print(f"{first_index = } {last_index = }")

if isinstance(df, pd.DataFrame):
return df

if debug_mode:
return inner_wrapper(
symbol,
start,
end,
timespan,
interval,
unadjusted,
max_chunk_days,
debug_mode,
paucity_threshold,
df_dtype,
)
wrapped = memory.cache(inner_wrapper)(
symbol,
start,
end,
timespan,
interval,
unadjusted,
max_chunk_days,
debug_mode,
paucity_threshold,
df_dtype,
)
if isinstance(wrapped, pd.DataFrame):
return wrapped
elif wrapped is None:
elif df is None:
return None
else:
raise ValueError(f"Expected Dataframe return; Got: {type(wrapped)}")
raise ValueError(f"Expected Dataframe return; Got: {type(df)}")

0 comments on commit 37fa104

Please sign in to comment.