Skip to content

Commit

Permalink
Refactor small stuff and clean up.
Browse files Browse the repository at this point in the history
  • Loading branch information
sirEven committed Nov 5, 2024
1 parent c711b59 commit 87348b9
Show file tree
Hide file tree
Showing 10 changed files with 47 additions and 43 deletions.
6 changes: 2 additions & 4 deletions locast/candle/candle_utility.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

EnumType = TypeVar("EnumType", bound=Enum)

# TODO: Check what functions are not needed anymore.


class CandleUtility:
@classmethod
Expand Down Expand Up @@ -129,7 +131,6 @@ def assert_candle_unity(cls, candles: List[Candle]) -> None:
mismatch = f"(candle #{index} is a mismatch: {candle})."
raise AssertionError(f"{msg} {mismatch}")

# TODO: Benchmark this func
@classmethod
def detect_missing_dates(
cls,
Expand Down Expand Up @@ -194,6 +195,3 @@ def missing_dates_between(
def midpoint(cls, start: datetime, end: datetime) -> datetime:
# Calculate the midpoint between two datetime objects.
return start + (end - start) / 2


# TODO: Check what functions are not needed anymore.
1 change: 0 additions & 1 deletion locast/candle/exchange_resolution.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ class ResolutionDetail:
notation: str


# TODO: Consider this ABC generally - or think of functionality that should be moved/created here.
class ExchangeResolution(ABC):
@classmethod
def notation_to_resolution_detail(cls, notation: str) -> ResolutionDetail:
Expand Down
29 changes: 12 additions & 17 deletions locast/candle_fetcher/dydx/candle_fetcher/dydx_candle_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,11 @@ async def fetch_candles(

if self._log_progress:
done += len(candle_batch)
log_progress("🚛", "candles", "fetched", done, total)
log_progress("🚛", market, "candles", "fetched", done, total)

candles.extend(candle_batch)
temp_end_date = candles[-1].started_at

# TODO: See if ordering total_missing_candles might make sense.
if total_missing_candles:
log_missing_candles(
"🚨",
Expand Down Expand Up @@ -151,48 +150,43 @@ async def fetch_candles_up_to_now(

return candles

# TODO: Missing candles can eff this up - but chances are tiny...
async def find_horizon(self, market: str, resolution: ResolutionDetail) -> datetime:
now = cu.normalized_now(resolution)
step = 1000 # TODO: Optimize this value (depending on resolution)
step = 1000
upper_bound = now
lower_bound = None

# Step 1: Exponential Back-Off Search
# First: Exponential Back-Off Search to quickly reach over the horizon (datetime at which exchange does not provide data)
while True:
back_n = cu.subtract_n_resolutions(now, resolution, step)
back_n_plus_resolution = cu.add_one_resolution(back_n, resolution)

# Query the single candle over the range [back_n, back_n + resolution]
# Query single candle
candle = await self._fetcher.fetch(
market,
resolution,
back_n,
back_n_plus_resolution,
)
# print(f"back_n: {back_n}")
# Candle exists, continue back-off
if candle:
# Candle still exists, continue back-off
upper_bound = back_n # Move the upper bound further back
step *= 2 # Double the step size
step *= 2
else:
lower_bound = back_n # We've gone too far, set this as the lower bound
# Reached over the horizon, candle does not exist
lower_bound = back_n # Set this as the lower bound
break

# Step 2: Binary Search within the determined range
# Second: Binary Search within the determined range
delta = upper_bound - lower_bound

while delta.total_seconds() > resolution.seconds:
mid_point = cu.norm_date(cu.midpoint(lower_bound, upper_bound), resolution)
mid_point_plus_resolution = cu.add_one_resolution(mid_point, resolution)

# Query for a single candle covering [mid_point, mid_point + resolution]
# Query single candle
candle = await self._fetcher.fetch(
market, resolution, mid_point, mid_point_plus_resolution
)
# print(f"delta: {delta}")
# print(f"upper_bound: {upper_bound}")
# print(f"lower_bound: {lower_bound}")
if candle:
# Move the upper bound down (closer to the lower bound)
upper_bound = mid_point
Expand All @@ -201,7 +195,8 @@ async def find_horizon(self, market: str, resolution: ResolutionDetail) -> datet
lower_bound = mid_point

delta = upper_bound - lower_bound
# Step 3: Final Candle Fetch to Confirm the Oldest Available Candle

# Finally: Fetch to confirm the oldest available candle
upper_bound_plus_resolution = cu.add_one_resolution(upper_bound, resolution)
final_candle = await self._fetcher.fetch(
market, resolution, upper_bound, upper_bound_plus_resolution
Expand Down
3 changes: 2 additions & 1 deletion locast/candle_storage/sql/sqlite_candle_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@ async def _bulk_save_objects_batched(

if self._log_progress:
done += len(batch)
log_progress("📀", "candles", "stored", done, total)
market = batch[0].market
log_progress("📀", market, "candles", "stored", done, total)

def _to_candles(self, database_candles: List[SqliteCandle]) -> List[Candle]:
mapper = DatabaseCandleMapper(SqliteCandleMapping())
Expand Down
15 changes: 13 additions & 2 deletions locast/logging_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@
from locast.candle.exchange_resolution import ResolutionDetail


# TODO: Also print market
def log_progress(
emoji: str,
market: str,
group_name: str,
past_tense: str,
amount_done: int,
total: int,
) -> None:
progress_message = f"{amount_done} of {total} {group_name} {past_tense}."
progress_message = f"{amount_done} of {total} {market}-{group_name} {past_tense}."
if amount_done == total:
print(f"\r{emoji} {progress_message} ✅", end="\n", flush=True)
else:
Expand All @@ -37,5 +37,16 @@ def log_missing_candles(
print(f" ❌ Candle missing: {date}.")


def log_start_date_shifted_to_horizon(
emoji: str,
exchange: Exchange,
start_date: datetime,
horizon: datetime,
) -> None:
print(
f"{emoji} Attention: Candle data on {exchange.name} only reaches back to {horizon}. To prevent over-fetching, the provided start date ({start_date}) will be shifted. {emoji}"
)


def log_redundant_call(emoji: str, message: str) -> None:
print(f"{emoji} {message}")
21 changes: 11 additions & 10 deletions locast/store_manager/store_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@
from locast.candle_fetcher.candle_fetcher import CandleFetcher
from locast.candle_storage.cluster_info import ClusterInfo
from locast.candle_storage.candle_storage import CandleStorage
from locast.logging_functions import log_redundant_call
from locast.logging_functions import (
log_redundant_call,
log_start_date_shifted_to_horizon,
)

# TODO: Here we could call find_horizon, correct the start date accordingly and right at the beginning print an according message.
# If horizon <= start_date, all is fine, no correction necessary. Else, user is over fetching into the past, hence we replace start_date with horizon.
# NOTE: StoreManager might be a valid place to for this, as it allows clean caching of determined horizons, providing them for its life time. Then we could
# add find_horizon to the CandleFetcher Protocol. It would also only ever need to be called and cached at the beginning of create_cluster()
# TODO: Expand API by adding convenience methods such as def retrieve_segment(from, to, exchange, market, resolution) -> List[Candle]: ...


class StoreManager:
Expand Down Expand Up @@ -133,8 +133,6 @@ async def get_cluster_info(
) -> ClusterInfo:
return await self._candle_storage.get_cluster_info(exchange, market, resolution)

# TODO: Expand API by adding convenience methods such as def retrieve_segment(from, to, exchange, market, resolution) -> List[Candle]: ...

async def _check_horizon(
self,
market: str,
Expand All @@ -146,9 +144,12 @@ async def _check_horizon(
self._horizon_cache[f"{market}_{resolution.notation}"] = horizon

if start_date < horizon:
ex = self._candle_fetcher.exchange.name
print(f"Attention: Candles on {ex} only reach back to {horizon}.")
print(f"Start date ({start_date}) will be moved, to prevent conflicts.")
log_start_date_shifted_to_horizon(
"⏭️",
self._candle_fetcher.exchange,
start_date,
horizon,
)
start_date = horizon
return start_date

Expand Down
5 changes: 2 additions & 3 deletions tests/candle_fetcher/test_candle_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,8 @@ async def test_fetch_cluster_prints_progress_correctly(

# then
out, _ = capsys.readouterr()
assert f"of {amount_back} candles fetched." in out
assert f"🚛 {amount_back} of {amount_back} candles fetched. ✅" in out
assert f"of {amount_back} {market}-candles fetched." in out
assert f"🚛 {amount_back} of {amount_back} {market}-candles fetched. ✅" in out


@pytest.mark.parametrize("candle_fetcher_mock", list(mocked_candle_fetchers.keys()))
Expand Down Expand Up @@ -165,7 +165,6 @@ async def test_fetch_cluster_prints_missing_candles_correctly(
assert out.count("❌ Candle missing:") == n_missing


# TODO: TDD approach to (in integration though) for when fetcher hits candle horizon of exchange. This is currently being researched in autogluon spike...
@pytest.mark.parametrize("candle_fetcher_mock", list(mocked_candle_fetchers.keys()))
@pytest.mark.asyncio
async def test_fetch_cluster_prints_missing_candles_on_batch_newest_edge_correctly(
Expand Down
1 change: 0 additions & 1 deletion tests/helper/candle_mockery/v4_indexer_mock.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from tests.helper.candle_mockery.dydx_candle_backend_mock import DydxCandleBackendMock


# TODO: Implement horizon - meaning: After n returns, return []
class V4MarketsClientMock(MarketsClient):
def __init__(self) -> None:
pass
Expand Down
8 changes: 5 additions & 3 deletions tests/logging/test_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@ def test_log_progress_prints_correctly(
action = "stored"
emoji = "📀"

market = "BTC-USD"

result: List[str] = []

# when
for done in range(1, total + 1):
log_progress(emoji, name, action, done, total)
log_progress(emoji, market, name, action, done, total)
out, _ = capsys.readouterr()
result.append(out)

Expand All @@ -25,9 +27,9 @@ def test_log_progress_prints_correctly(
# then
expected: List[str] = []
for done in range(1, total + 1):
msg = f"📀 {done} of {total} {name} {action}.\r"
msg = f"📀 {done} of {total} {market}-{name} {action}.\r"
expected.append(msg)

expected[-1] = f"\r📀 {total} of {total} {name} {action}. ✅\n"
expected[-1] = f"\r📀 {total} of {total} {market}-{name} {action}. ✅\n"

assert result == expected
1 change: 0 additions & 1 deletion tests/store_manager/test_store_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
from tests.helper.candle_mockery.mock_dydx_v4_candles import mock_dydx_v4_candle_range


# FIXME: WIP - BACKEND MOCK NOT HANDLING HORIZON
@pytest.mark.asyncio
async def test_create_cluster_results_in_correct_cluster_state(
store_manager_mock_memory: StoreManager,
Expand Down

0 comments on commit 87348b9

Please sign in to comment.