diff --git a/locast/candle/candle_utility.py b/locast/candle/candle_utility.py index 85c150b..2b767ae 100644 --- a/locast/candle/candle_utility.py +++ b/locast/candle/candle_utility.py @@ -7,6 +7,8 @@ EnumType = TypeVar("EnumType", bound=Enum) +# TODO: Check what functions are not needed anymore. + class CandleUtility: @classmethod @@ -129,7 +131,6 @@ def assert_candle_unity(cls, candles: List[Candle]) -> None: mismatch = f"(candle #{index} is a mismatch: {candle})." raise AssertionError(f"{msg} {mismatch}") - # TODO: Benchmark this func @classmethod def detect_missing_dates( cls, @@ -194,6 +195,3 @@ def missing_dates_between( def midpoint(cls, start: datetime, end: datetime) -> datetime: # Calculate the midpoint between two datetime objects. return start + (end - start) / 2 - - -# TODO: Check what functions are not needed anymore. diff --git a/locast/candle/exchange_resolution.py b/locast/candle/exchange_resolution.py index 16bdc2c..ba53a04 100644 --- a/locast/candle/exchange_resolution.py +++ b/locast/candle/exchange_resolution.py @@ -20,7 +20,6 @@ class ResolutionDetail: notation: str -# TODO: Consider this ABC generally - or think of functionality that should be moved/created here. class ExchangeResolution(ABC): @classmethod def notation_to_resolution_detail(cls, notation: str) -> ResolutionDetail: diff --git a/locast/candle_fetcher/dydx/candle_fetcher/dydx_candle_fetcher.py b/locast/candle_fetcher/dydx/candle_fetcher/dydx_candle_fetcher.py index cdcd2be..22e20d6 100644 --- a/locast/candle_fetcher/dydx/candle_fetcher/dydx_candle_fetcher.py +++ b/locast/candle_fetcher/dydx/candle_fetcher/dydx_candle_fetcher.py @@ -83,12 +83,11 @@ async def fetch_candles( if self._log_progress: done += len(candle_batch) - log_progress("šŸš›", "candles", "fetched", done, total) + log_progress("šŸš›", market, "candles", "fetched", done, total) candles.extend(candle_batch) temp_end_date = candles[-1].started_at - # TODO: See if ordering total_missing_candles might make sense. if total_missing_candles: log_missing_candles( "šŸšØ", @@ -151,48 +150,43 @@ async def fetch_candles_up_to_now( return candles - # TODO: Missing candles can eff this up - but chances are tiny... async def find_horizon(self, market: str, resolution: ResolutionDetail) -> datetime: now = cu.normalized_now(resolution) - step = 1000 # TODO: Optimize this value (depending on resolution) + step = 1000 upper_bound = now lower_bound = None - # Step 1: Exponential Back-Off Search + # First: Exponential Back-Off Search to quickly reach over the horizon (datetime at which exchange does not provide data) while True: back_n = cu.subtract_n_resolutions(now, resolution, step) back_n_plus_resolution = cu.add_one_resolution(back_n, resolution) - # Query the single candle over the range [back_n, back_n + resolution] + # Query single candle candle = await self._fetcher.fetch( market, resolution, back_n, back_n_plus_resolution, ) - # print(f"back_n: {back_n}") - # Candle exists, continue back-off if candle: + # Candle still exists, continue back-off upper_bound = back_n # Move the upper bound further back - step *= 2 # Double the step size + step *= 2 else: - lower_bound = back_n # We've gone too far, set this as the lower bound + # Reached over the horizon, candle does not exist + lower_bound = back_n # Set this as the lower bound break - # Step 2: Binary Search within the determined range + # Second: Binary Search within the determined range delta = upper_bound - lower_bound - while delta.total_seconds() > resolution.seconds: mid_point = cu.norm_date(cu.midpoint(lower_bound, upper_bound), resolution) mid_point_plus_resolution = cu.add_one_resolution(mid_point, resolution) - # Query for a single candle covering [mid_point, mid_point + resolution] + # Query single candle candle = await self._fetcher.fetch( market, resolution, mid_point, mid_point_plus_resolution ) - # print(f"delta: {delta}") - # print(f"upper_bound: {upper_bound}") - # print(f"lower_bound: {lower_bound}") if candle: # Move the upper bound down (closer to the lower bound) upper_bound = mid_point @@ -201,7 +195,8 @@ async def find_horizon(self, market: str, resolution: ResolutionDetail) -> datet lower_bound = mid_point delta = upper_bound - lower_bound - # Step 3: Final Candle Fetch to Confirm the Oldest Available Candle + + # Finally: Fetch to confirm the oldest available candle upper_bound_plus_resolution = cu.add_one_resolution(upper_bound, resolution) final_candle = await self._fetcher.fetch( market, resolution, upper_bound, upper_bound_plus_resolution diff --git a/locast/candle_storage/sql/sqlite_candle_storage.py b/locast/candle_storage/sql/sqlite_candle_storage.py index a871ef4..c8847f3 100644 --- a/locast/candle_storage/sql/sqlite_candle_storage.py +++ b/locast/candle_storage/sql/sqlite_candle_storage.py @@ -151,7 +151,8 @@ async def _bulk_save_objects_batched( if self._log_progress: done += len(batch) - log_progress("šŸ“€", "candles", "stored", done, total) + market = batch[0].market + log_progress("šŸ“€", market, "candles", "stored", done, total) def _to_candles(self, database_candles: List[SqliteCandle]) -> List[Candle]: mapper = DatabaseCandleMapper(SqliteCandleMapping()) diff --git a/locast/logging_functions.py b/locast/logging_functions.py index d12fbf0..0b90c3d 100644 --- a/locast/logging_functions.py +++ b/locast/logging_functions.py @@ -5,15 +5,15 @@ from locast.candle.exchange_resolution import ResolutionDetail -# TODO: Also print market def log_progress( emoji: str, + market: str, group_name: str, past_tense: str, amount_done: int, total: int, ) -> None: - progress_message = f"{amount_done} of {total} {group_name} {past_tense}." + progress_message = f"{amount_done} of {total} {market}-{group_name} {past_tense}." if amount_done == total: print(f"\r{emoji} {progress_message} āœ…", end="\n", flush=True) else: @@ -37,5 +37,16 @@ def log_missing_candles( print(f" āŒ Candle missing: {date}.") +def log_start_date_shifted_to_horizon( + emoji: str, + exchange: Exchange, + start_date: datetime, + horizon: datetime, +) -> None: + print( + f"{emoji} Attention: Candle data on {exchange.name} only reaches back to {horizon}. To prevent over-fetching, the provided start date ({start_date}) will be shifted. {emoji}" + ) + + def log_redundant_call(emoji: str, message: str) -> None: print(f"{emoji} {message}") diff --git a/locast/store_manager/store_manager.py b/locast/store_manager/store_manager.py index cba1f29..2a89b7f 100644 --- a/locast/store_manager/store_manager.py +++ b/locast/store_manager/store_manager.py @@ -8,12 +8,12 @@ from locast.candle_fetcher.candle_fetcher import CandleFetcher from locast.candle_storage.cluster_info import ClusterInfo from locast.candle_storage.candle_storage import CandleStorage -from locast.logging_functions import log_redundant_call +from locast.logging_functions import ( + log_redundant_call, + log_start_date_shifted_to_horizon, +) -# TODO: Here we could call find_horizon, correct the start date accordingly and right at the beginning print an according message. -# If horizon <= start_date, all is fine, no correction necessary. Else, user is over fetching into the past, hence we replace start_date with horizon. -# NOTE: StoreManager might be a valid place to for this, as it allows clean caching of determined horizons, providing them for its life time. Then we could -# add find_horizon to the CandleFetcher Protocol. It would also only ever need to be called and cached at the beginning of create_cluster() +# TODO: Expand API by adding convenience methods such as def retrieve_segment(from, to, exchange, market, resolution) -> List[Candle]: ... class StoreManager: @@ -133,8 +133,6 @@ async def get_cluster_info( ) -> ClusterInfo: return await self._candle_storage.get_cluster_info(exchange, market, resolution) - # TODO: Expand API by adding convenience methods such as def retrieve_segment(from, to, exchange, market, resolution) -> List[Candle]: ... - async def _check_horizon( self, market: str, @@ -146,9 +144,12 @@ async def _check_horizon( self._horizon_cache[f"{market}_{resolution.notation}"] = horizon if start_date < horizon: - ex = self._candle_fetcher.exchange.name - print(f"Attention: Candles on {ex} only reach back to {horizon}.") - print(f"Start date ({start_date}) will be moved, to prevent conflicts.") + log_start_date_shifted_to_horizon( + "ā­ļø", + self._candle_fetcher.exchange, + start_date, + horizon, + ) start_date = horizon return start_date diff --git a/tests/candle_fetcher/test_candle_fetcher.py b/tests/candle_fetcher/test_candle_fetcher.py index faa2e9f..1c8c886 100644 --- a/tests/candle_fetcher/test_candle_fetcher.py +++ b/tests/candle_fetcher/test_candle_fetcher.py @@ -130,8 +130,8 @@ async def test_fetch_cluster_prints_progress_correctly( # then out, _ = capsys.readouterr() - assert f"of {amount_back} candles fetched." in out - assert f"šŸš› {amount_back} of {amount_back} candles fetched. āœ…" in out + assert f"of {amount_back} {market}-candles fetched." in out + assert f"šŸš› {amount_back} of {amount_back} {market}-candles fetched. āœ…" in out @pytest.mark.parametrize("candle_fetcher_mock", list(mocked_candle_fetchers.keys())) @@ -165,7 +165,6 @@ async def test_fetch_cluster_prints_missing_candles_correctly( assert out.count("āŒ Candle missing:") == n_missing -# TODO: TDD approach to (in integration though) for when fetcher hits candle horizon of exchange. This is currently being researched in autogluon spike... @pytest.mark.parametrize("candle_fetcher_mock", list(mocked_candle_fetchers.keys())) @pytest.mark.asyncio async def test_fetch_cluster_prints_missing_candles_on_batch_newest_edge_correctly( diff --git a/tests/helper/candle_mockery/v4_indexer_mock.py b/tests/helper/candle_mockery/v4_indexer_mock.py index 428ecea..fba5c28 100644 --- a/tests/helper/candle_mockery/v4_indexer_mock.py +++ b/tests/helper/candle_mockery/v4_indexer_mock.py @@ -8,7 +8,6 @@ from tests.helper.candle_mockery.dydx_candle_backend_mock import DydxCandleBackendMock -# TODO: Implement horizon - meaning: After n returns, return [] class V4MarketsClientMock(MarketsClient): def __init__(self) -> None: pass diff --git a/tests/logging/test_logging.py b/tests/logging/test_logging.py index 43c9e5e..ddb14ce 100644 --- a/tests/logging/test_logging.py +++ b/tests/logging/test_logging.py @@ -12,11 +12,13 @@ def test_log_progress_prints_correctly( action = "stored" emoji = "šŸ“€" + market = "BTC-USD" + result: List[str] = [] # when for done in range(1, total + 1): - log_progress(emoji, name, action, done, total) + log_progress(emoji, market, name, action, done, total) out, _ = capsys.readouterr() result.append(out) @@ -25,9 +27,9 @@ def test_log_progress_prints_correctly( # then expected: List[str] = [] for done in range(1, total + 1): - msg = f"šŸ“€ {done} of {total} {name} {action}.\r" + msg = f"šŸ“€ {done} of {total} {market}-{name} {action}.\r" expected.append(msg) - expected[-1] = f"\ršŸ“€ {total} of {total} {name} {action}. āœ…\n" + expected[-1] = f"\ršŸ“€ {total} of {total} {market}-{name} {action}. āœ…\n" assert result == expected diff --git a/tests/store_manager/test_store_manager.py b/tests/store_manager/test_store_manager.py index a1ea51b..73f400e 100644 --- a/tests/store_manager/test_store_manager.py +++ b/tests/store_manager/test_store_manager.py @@ -13,7 +13,6 @@ from tests.helper.candle_mockery.mock_dydx_v4_candles import mock_dydx_v4_candle_range -# FIXME: WIP - BACKEND MOCK NOT HANDLING HORIZON @pytest.mark.asyncio async def test_create_cluster_results_in_correct_cluster_state( store_manager_mock_memory: StoreManager,