From c7fab1b6c6e86422c19e8df552ce7a0e8ca3b240 Mon Sep 17 00:00:00 2001 From: sirEven Date: Fri, 4 Oct 2024 01:40:41 +0200 Subject: [PATCH] Refactor code related to candle fetching and introduce integrity violation handling. This is a first pass and needs some polishing. --- locast/candle/candle_utility.py | 30 ++++++---- .../dydx/api_fetcher/dydx_v3_fetcher.py | 51 +++++++++-------- .../dydx/api_fetcher/dydx_v4_fetcher.py | 27 +++++---- .../candle_fetcher/dydx_candle_fetcher.py | 56 ++++++++++--------- .../{api_exception.py => exceptions.py} | 6 +- locast/logging.py | 22 ++++++++ notebooks/store_manger_example.ipynb | 13 +++-- tests/candle/test_candle_utility.py | 19 ++++--- .../integration/test_candle_fetcher.py | 7 +-- .../integration/test_dydx_v3_fetcher.py | 2 +- .../integration/test_dydx_v4_fetcher.py | 26 ++++++--- tests/candle_fetcher/test_candle_fetcher.py | 4 +- tests/store_manager/test_store_manager.py | 5 +- 13 files changed, 162 insertions(+), 106 deletions(-) rename locast/candle_fetcher/{api_exception.py => exceptions.py} (54%) diff --git a/locast/candle/candle_utility.py b/locast/candle/candle_utility.py index 9bd29e1..47036fa 100644 --- a/locast/candle/candle_utility.py +++ b/locast/candle/candle_utility.py @@ -1,5 +1,5 @@ from enum import Enum -from typing import List, TypeVar +from typing import List, TypeVar, Tuple from zoneinfo import ZoneInfo from datetime import datetime, timedelta, timezone from locast.candle.candle import Candle @@ -129,19 +129,27 @@ def assert_candle_unity(cls, candles: List[Candle]) -> None: mismatch = f"(candle #{index} is a mismatch: {candle})." raise AssertionError(f"{msg} {mismatch}") + # TODO: Benchmark this func @classmethod - def assert_chronologic_order(cls, candles: List[Candle]) -> None: - # All date diffs == 1 res (no gaps, no duplicates, right order) - assert candles - res_sec = candles[0].resolution.seconds + def find_integrity_violations( + cls, + candles: List[Candle], + ) -> List[Tuple[Candle, Candle]]: + violations: List[Tuple[Candle, Candle]] = [] + + if len(candles) <= 1: + return violations + + diff_ok = timedelta(seconds=candles[0].resolution.seconds) for i, candle in enumerate(candles): if i > 0: - msg = "Order violated from Candles" - new = candles[i - 1].started_at - old = candle.started_at - assert candles[i - 1].started_at - candle.started_at == timedelta( - seconds=res_sec - ), f"{msg} {candle.id} ({old}) to {candles[i - 1].id} ({new})." + new = candles[i - 1] + old = candle + if (new.started_at - old.started_at) == diff_ok: + pass + else: + violations.append((old, new)) + return violations @classmethod def amount_of_candles_in_range( diff --git a/locast/candle_fetcher/dydx/api_fetcher/dydx_v3_fetcher.py b/locast/candle_fetcher/dydx/api_fetcher/dydx_v3_fetcher.py index ab64fe0..7fdc863 100644 --- a/locast/candle_fetcher/dydx/api_fetcher/dydx_v3_fetcher.py +++ b/locast/candle_fetcher/dydx/api_fetcher/dydx_v3_fetcher.py @@ -14,8 +14,11 @@ datetime_to_dydx_iso_str, ) from locast.candle_fetcher.dydx.api_fetcher.dydx_fetcher import DydxFetcher +from locast.candle_fetcher.exceptions import APIException +# TODO: Sort out Exception handling - maybe here an APIException might be fine - but it only should be raised when something goes wrong api wise - NOT when integrity errors or empty lists come back. +# OR no exception at all?? class DydxV3Fetcher(DydxFetcher): def __init__(self, client: Client, rate_throttle_sec: float = 0.4) -> None: self._exchange = Exchange.DYDX @@ -36,26 +39,30 @@ async def fetch( start_date: datetime, end_date: datetime, ) -> List[Candle]: - # If only one candle is requested this prevents black magic in v3 backend - if cu.amount_of_candles_in_range(start_date, end_date, resolution) == 1: - start_date -= timedelta(seconds=1) - - now = datetime.now() - time_since_last_request = (now - self._last_request_time).total_seconds() - - if time_since_last_request < self._throttle_rate: - await asyncio.sleep(self._throttle_rate - time_since_last_request) - - loop = asyncio.get_running_loop() - response: Dict[str, Any] = await loop.run_in_executor( - None, - lambda: self._client.public.get_candles( # type: ignore - market, - resolution.notation, - from_iso=datetime_to_dydx_iso_str(start_date), - to_iso=datetime_to_dydx_iso_str(end_date), - ).data, - ) - - self._last_request_time = datetime.now() + try: + # If only one candle is requested this prevents black magic in v3 backend + if cu.amount_of_candles_in_range(start_date, end_date, resolution) == 1: + start_date -= timedelta(seconds=1) + + now = datetime.now() + time_since_last_request = (now - self._last_request_time).total_seconds() + + if time_since_last_request < self._throttle_rate: + await asyncio.sleep(self._throttle_rate - time_since_last_request) + + loop = asyncio.get_running_loop() + response: Dict[str, Any] = await loop.run_in_executor( + None, + lambda: self._client.public.get_candles( # type: ignore + market, + resolution.notation, + from_iso=datetime_to_dydx_iso_str(start_date), + to_iso=datetime_to_dydx_iso_str(end_date), + ).data, + ) + + self._last_request_time = datetime.now() + except Exception as e: + raise APIException(self._exchange, market, resolution, e) + return self._mapper.to_candles(response["candles"]) # type: ignore diff --git a/locast/candle_fetcher/dydx/api_fetcher/dydx_v4_fetcher.py b/locast/candle_fetcher/dydx/api_fetcher/dydx_v4_fetcher.py index 6cd3d42..2c514dd 100644 --- a/locast/candle_fetcher/dydx/api_fetcher/dydx_v4_fetcher.py +++ b/locast/candle_fetcher/dydx/api_fetcher/dydx_v4_fetcher.py @@ -15,8 +15,11 @@ datetime_to_dydx_iso_str, ) from locast.candle_fetcher.dydx.api_fetcher.dydx_fetcher import DydxFetcher +from locast.candle_fetcher.exceptions import APIException +# TODO: Sort out Exception handling - maybe here an APIException might be fine - but it only should be raised when something goes wrong api wise - NOT when integrity errors or empty lists come back. +# OR no exception at all?? class DydxV4Fetcher(DydxFetcher): def __init__( self, @@ -37,14 +40,18 @@ async def fetch( start_date: datetime, end_date: datetime, ) -> List[Candle]: - response: Dict[ - str, - Any, - ] = await self._client.markets.get_perpetual_market_candles( # type: ignore - market=market, - resolution=resolution.notation, - from_iso=datetime_to_dydx_iso_str(start_date), - to_iso=datetime_to_dydx_iso_str(end_date), - ) - assert response["candles"] + try: + response: Dict[ + str, + Any, + ] = await self._client.markets.get_perpetual_market_candles( # type: ignore + market=market, + resolution=resolution.notation, + from_iso=datetime_to_dydx_iso_str(start_date), + to_iso=datetime_to_dydx_iso_str(end_date), + ) + + except Exception as e: + raise APIException(self._exchange, market, resolution, e) + return self._mapper.to_candles(response["candles"]) diff --git a/locast/candle_fetcher/dydx/candle_fetcher/dydx_candle_fetcher.py b/locast/candle_fetcher/dydx/candle_fetcher/dydx_candle_fetcher.py index 94c5a91..05caae1 100644 --- a/locast/candle_fetcher/dydx/candle_fetcher/dydx_candle_fetcher.py +++ b/locast/candle_fetcher/dydx/candle_fetcher/dydx_candle_fetcher.py @@ -1,5 +1,5 @@ from datetime import datetime -from typing import List +from typing import List, Tuple from locast.candle.candle import Candle @@ -7,10 +7,10 @@ from locast.candle.candle_utility import CandleUtility as cu from locast.candle.exchange import Exchange from locast.candle.resolution import ResolutionDetail -from locast.candle_fetcher.api_exception import APIException from locast.candle_fetcher.candle_fetcher import CandleFetcher from locast.candle_fetcher.dydx.api_fetcher.dydx_fetcher import DydxFetcher -from locast.logging import log_progress +from locast.candle_fetcher.exceptions import APIException +from locast.logging import log_integrity_violations, log_progress class DydxCandleFetcher(CandleFetcher): @@ -49,9 +49,9 @@ async def fetch_candles( candles: List[Candle] = [] temp_end_date = end_date - iteration = 0 - - total, done = self._create_log_vars(resolution, start_date, end_date) + total = cu.amount_of_candles_in_range(start_date, end_date, resolution) + done = 0 + violations: List[Tuple[Candle, Candle]] = [] try: while (not candles) or candles[-1].started_at > start_date: @@ -62,16 +62,32 @@ async def fetch_candles( temp_end_date, ) - candles.extend(candle_batch) - temp_end_date = candles[-1].started_at - iteration += 1 + if len(candle_batch) > 0: + for violation in cu.find_integrity_violations(candle_batch): + violations.append(violation) + total -= 1 + + if self._log_progress: + done += len(candle_batch) + log_progress("šŸš›", "candles", "fetched", done, total) + + candles.extend(candle_batch) + temp_end_date = candles[-1].started_at - if self._log_progress: - done += len(candle_batch) - log_progress("šŸš›", "candles", "fetched", done, total) + else: + break + + if len(violations) > 0: + log_integrity_violations( + "šŸšØ", + self._exchange, + market, + resolution, + violations, + ) except Exception as e: - raise APIException(self._exchange, market, resolution, e) from e + raise APIException(self._exchange, market, resolution, e) return candles @@ -122,17 +138,3 @@ async def fetch_candles_up_to_now( temp_now_minus_res = cu.subtract_n_resolutions(temp_norm_now, resolution, 1) return candles - - def _create_log_vars( - self, - resolution: ResolutionDetail, - start_date: datetime, - end_date: datetime, - ): - if self._log_progress: - total = cu.amount_of_candles_in_range(start_date, end_date, resolution) - done = 0 - else: - done = 0 - total = 0 - return total, done diff --git a/locast/candle_fetcher/api_exception.py b/locast/candle_fetcher/exceptions.py similarity index 54% rename from locast/candle_fetcher/api_exception.py rename to locast/candle_fetcher/exceptions.py index bc13923..9f5a234 100644 --- a/locast/candle_fetcher/api_exception.py +++ b/locast/candle_fetcher/exceptions.py @@ -9,8 +9,6 @@ def __init__( market: str, resolution: ResolutionDetail, exception: Exception, - message: str | None = None, ) -> None: - if message is None: - message = f"{exchange.name}: Error fetching market data for market '{market}' and resolution '{resolution.notation}: {exception}'." - super().__init__(message) + msg = f"{exchange.name}: Error fetching market data for market '{market}' and resolution '{resolution.notation}: {exception}'." + super().__init__(msg) diff --git a/locast/logging.py b/locast/logging.py index 942278d..36a2633 100644 --- a/locast/logging.py +++ b/locast/logging.py @@ -1,3 +1,10 @@ +from typing import List, Tuple + +from locast.candle.candle import Candle +from locast.candle.exchange import Exchange +from locast.candle.resolution import ResolutionDetail + + def log_progress( emoji: str, group_name: str, @@ -12,5 +19,20 @@ def log_progress( print(f"{emoji} {progress_message}", end="\r", flush=True) +def log_integrity_violations( + emoji: str, + exchange: Exchange, + market: str, + resolution: ResolutionDetail, + violations: List[Tuple[Candle, Candle]], +) -> None: + n = len(violations) + v = "integrity violations" if n > 1 else "integrity violation" + detail = f"{market}, {resolution.notation}" + print(f"{emoji} Attention: {exchange.name} delivered {n} {v} for {detail} {emoji}") + for v in violations: + print(f" āŒ {v[0].started_at} - {v[1].started_at}") + + def log_redundant_call(emoji: str, message: str) -> None: print(f"{emoji} {message}") diff --git a/notebooks/store_manger_example.ipynb b/notebooks/store_manger_example.ipynb index 54c4d3d..68d5ddf 100644 --- a/notebooks/store_manger_example.ipynb +++ b/notebooks/store_manger_example.ipynb @@ -17,9 +17,11 @@ "source": [ "from datetime import timedelta\n", "import time\n", - "\n", + "from dydx3 import Client # type: ignore\n", + "from dydx3.constants import API_HOST_MAINNET # type: ignore\n", "from dydx_v4_client.indexer.rest.indexer_client import IndexerClient # type: ignore\n", "from dydx_v4_client.network import make_mainnet # type: ignore\n", + "from locast.candle_fetcher.dydx.api_fetcher.dydx_v3_fetcher import DydxV3Fetcher\n", "from sqlalchemy import create_engine # type: ignore\n", "\n", "\n", @@ -40,14 +42,15 @@ " node_url=\"dydx-ops-rpc.kingnodes.com:443\",\n", ")\n", "\n", - "DELETE_CLUSTER = False\n", + "DELETE_CLUSTER = True\n", "\n", "\n", + "# TODO: Duplicate file and do same stuff with v3 or pass store manager to example func?\n", "async def dydx_v4_example() -> None:\n", " # Create store manager.\n", " engine = create_engine(\"sqlite:///locast.db\")\n", " candle_storage = SqliteCandleStorage(engine, log_progress=True)\n", - "\n", + " dydx_v3_fetcher = DydxV3Fetcher(Client(host=API_HOST_MAINNET))\n", " dydx_v4_fetcher = DydxV4Fetcher(IndexerClient(MAINNET.rest_indexer))\n", " candle_fetcher = DydxCandleFetcher(dydx_v4_fetcher, log_progress=True)\n", "\n", @@ -79,7 +82,7 @@ " # If not, update cluster.\n", " eth_info = await manager.get_cluster_info(exchange, market, resolution)\n", " if not eth_info.is_uptodate:\n", - " print(\"šŸ”„ Updating cluster...\")\n", + " print(\"\\nšŸ”„ Updating cluster...\")\n", " start = time.time()\n", " await manager.update_cluster(exchange, market, resolution)\n", " print(f\"āœ… Cluster updated in {round(time.time() - start, 2)} seconds.\")\n", @@ -92,7 +95,7 @@ "\n", " # Set DELETE flag at the top to True, in order to try out deleting the eth candle cluster\n", " if DELETE_CLUSTER:\n", - " print(\"šŸ—‘ļø Deleting cluster...\")\n", + " print(\"\\nšŸ—‘ļø Deleting cluster...\")\n", " start = time.time()\n", " await manager.delete_cluster(exchange, market, resolution)\n", " print(f\"āœ… Cluster deleted in {round(time.time() - start, 2)} seconds.\")\n", diff --git a/tests/candle/test_candle_utility.py b/tests/candle/test_candle_utility.py index f7eb539..61e77fd 100644 --- a/tests/candle/test_candle_utility.py +++ b/tests/candle/test_candle_utility.py @@ -49,26 +49,31 @@ def test_is_newest_valid_candles_returns_false() -> None: assert uc.is_newest_valid_candle(candle) is False -def test_assert_chronological_order_returns_true( +def test_find_integrity_violations_returns_empty( dydx_v4_eth_one_min_mock_candles: List[Candle], ) -> None: # given candles = dydx_v4_eth_one_min_mock_candles - # when & then - uc.assert_chronologic_order(candles) + # when + violations = uc.find_integrity_violations(candles) + # then + assert len(violations) == 0 -def test_assert_chronological_order_returns_false( + +def test_find_integrity_violations_returns_violations( dydx_v4_eth_one_min_mock_candles: List[Candle], ) -> None: # given candles = dydx_v4_eth_one_min_mock_candles faulty_candles = [candles[0], candles[1], candles[3], candles[4]] - # when & then - with pytest.raises(AssertionError): - uc.assert_chronologic_order(faulty_candles) + # when + violations = uc.find_integrity_violations(faulty_candles) + + # then + assert len(violations) > 0 def test_assert_candle_unity_returns_true( diff --git a/tests/candle_fetcher/integration/test_candle_fetcher.py b/tests/candle_fetcher/integration/test_candle_fetcher.py index eaa2465..718cf52 100644 --- a/tests/candle_fetcher/integration/test_candle_fetcher.py +++ b/tests/candle_fetcher/integration/test_candle_fetcher.py @@ -53,8 +53,6 @@ async def test_fetch_range_of_candles_testnet( ) # then - amount = cu.amount_of_candles_in_range(start, end, res) - assert len(candles) == amount assert candles[-1].started_at == start assert candles[0].started_at == end - timedelta(seconds=res.seconds) @@ -82,8 +80,6 @@ async def test_fetch_range_of_candles_mainnet( ) # then - amount = cu.amount_of_candles_in_range(start, end, res) - assert len(candles) == amount assert candles[-1].started_at == start assert candles[0].started_at == end - timedelta(seconds=res.seconds) @@ -105,6 +101,7 @@ async def test_fetch_cluster_is_up_to_date( now_rounded = cu.norm_date(now_utc_iso(), res) start_date = now_rounded - timedelta(seconds=res.seconds * amount_back) + # when candles = await fetcher.fetch_candles_up_to_now( "ETH-USD", res, @@ -113,6 +110,4 @@ async def test_fetch_cluster_is_up_to_date( # then cu.assert_candle_unity(candles) - cu.assert_chronologic_order(candles) assert cu.is_newest_valid_candle(candles[0]) - assert len(candles) >= amount_back diff --git a/tests/candle_fetcher/integration/test_dydx_v3_fetcher.py b/tests/candle_fetcher/integration/test_dydx_v3_fetcher.py index 7b2b5f4..b5c7461 100644 --- a/tests/candle_fetcher/integration/test_dydx_v3_fetcher.py +++ b/tests/candle_fetcher/integration/test_dydx_v3_fetcher.py @@ -16,7 +16,7 @@ from locast.candle_fetcher.dydx.api_fetcher.dydx_v4_fetcher import DydxV4Fetcher -# NOTE: TDD test in order to find a fix for dydx v3 "backend magic" +# NOTE: TDD in order to find a fix for dydx v3 "backend magic" @pytest.mark.asyncio async def test_v3_fetch_range_of_one_candle_returns_one_candle() -> None: # given diff --git a/tests/candle_fetcher/integration/test_dydx_v4_fetcher.py b/tests/candle_fetcher/integration/test_dydx_v4_fetcher.py index f0e5439..f918729 100644 --- a/tests/candle_fetcher/integration/test_dydx_v4_fetcher.py +++ b/tests/candle_fetcher/integration/test_dydx_v4_fetcher.py @@ -1,26 +1,36 @@ +from typing import List import pytest from sir_utilities.date_time import string_to_datetime +from locast.candle.candle_utility import CandleUtility as cu from locast.candle.dydx.dydx_resolution import DydxResolution from locast.candle_fetcher.dydx.candle_fetcher.dydx_candle_fetcher import ( DydxCandleFetcher, ) -# This test exists only to see, wether the v4 backend is being maintained to sometime include this candle again or not (which I'm sure will not happen). -# Order violated from Candles None (2024-07-25 06:52:00+00:00) to None (2024-07-25 06:54:00+00:00) -# Meaning: The (mainnet!) backend is actually missing one candle (which startedAt 2024-07-25 06:53:00+00:00) -@pytest.mark.skip(reason="This is only to check if dYdX fixed their missing candle.") +# This test exists only to see, wether the v4 backend is being maintained to sometime include missing candles again or not (which I'm sure will not happen). +# Meaning: The (mainnet!) backend is actually missing candles + + +missing_candle_dates: List[str] = [ + "2024-07-25T06:53:00.000Z", + "2024-10-03T14:13:00.000Z", +] + + +# @pytest.mark.skip(reason="This is only to check if dYdX fixed their missing candle.") +@pytest.mark.parametrize("started_at", missing_candle_dates) @pytest.mark.asyncio -async def test_candle_error_at_2024_07_25_06_52( - dydx_v4_candle_fetcher_mainnet: DydxCandleFetcher, +async def test_missing_candles_on_mainnet( + dydx_v4_candle_fetcher_mainnet: DydxCandleFetcher, started_at: str ) -> None: # given res = DydxResolution.ONE_MINUTE fetcher = dydx_v4_candle_fetcher_mainnet - start = string_to_datetime("2024-07-25T06:53:00.000Z") - end = string_to_datetime("2024-07-25T06:54:00.000Z") + start = string_to_datetime(started_at) + end = cu.add_one_resolution(start, res) # when candles = await fetcher.fetch_candles( diff --git a/tests/candle_fetcher/test_candle_fetcher.py b/tests/candle_fetcher/test_candle_fetcher.py index 47dd745..6b5573a 100644 --- a/tests/candle_fetcher/test_candle_fetcher.py +++ b/tests/candle_fetcher/test_candle_fetcher.py @@ -7,10 +7,10 @@ from locast.candle.dydx.dydx_resolution import DydxResolution from locast.candle.resolution import ResolutionDetail from locast.candle_fetcher.dydx.candle_fetcher.dydx_candle_fetcher import ( - APIException, DydxCandleFetcher, ) +from locast.candle_fetcher.exceptions import APIException from tests.helper.parametrization.list_of_resolution_details import resolutions from tests.conftest import get_typed_fixture @@ -72,9 +72,7 @@ async def test_fetch_cluster_is_up_to_date( # then cu.assert_candle_unity(candles) - cu.assert_chronologic_order(candles) assert cu.is_newest_valid_candle(candles[0]) - assert len(candles) >= amount_back @pytest.mark.parametrize("candle_fetcher_mock", mocked_candle_fetchers) diff --git a/tests/store_manager/test_store_manager.py b/tests/store_manager/test_store_manager.py index cdeb520..11459b6 100644 --- a/tests/store_manager/test_store_manager.py +++ b/tests/store_manager/test_store_manager.py @@ -172,7 +172,7 @@ async def test_update_cluster_results_in_valid_cluster( cluster = await storage.retrieve_cluster(exchange, market, resolution) cu.assert_candle_unity(cluster) - cu.assert_chronologic_order(cluster) + assert len(cu.find_integrity_violations(cluster)) == 0 assert info.head assert cu.is_newest_valid_candle(info.head) assert cluster[0] == info.head @@ -268,10 +268,11 @@ async def test_retrieve_cluster_results_in_correct_cluster( # then info = await storage.get_cluster_info(exchange, market, resolution) + assert cluster[0] == info.head assert cluster[-1] == info.tail cu.assert_candle_unity(cluster) - cu.assert_chronologic_order(cluster) + assert len(cu.find_integrity_violations(cluster)) == 0 @pytest.mark.asyncio