Skip to content

Commit

Permalink
Refactor code related to candle fetching and introduce integrity viol…
Browse files Browse the repository at this point in the history
…ation handling. This is a first pass and needs some polishing.
  • Loading branch information
sirEven committed Oct 3, 2024
1 parent cb13d6e commit c7fab1b
Show file tree
Hide file tree
Showing 13 changed files with 162 additions and 106 deletions.
30 changes: 19 additions & 11 deletions locast/candle/candle_utility.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from enum import Enum
from typing import List, TypeVar
from typing import List, TypeVar, Tuple
from zoneinfo import ZoneInfo
from datetime import datetime, timedelta, timezone
from locast.candle.candle import Candle
Expand Down Expand Up @@ -129,19 +129,27 @@ def assert_candle_unity(cls, candles: List[Candle]) -> None:
mismatch = f"(candle #{index} is a mismatch: {candle})."
raise AssertionError(f"{msg} {mismatch}")

# TODO: Benchmark this func
@classmethod
def assert_chronologic_order(cls, candles: List[Candle]) -> None:
# All date diffs == 1 res (no gaps, no duplicates, right order)
assert candles
res_sec = candles[0].resolution.seconds
def find_integrity_violations(
cls,
candles: List[Candle],
) -> List[Tuple[Candle, Candle]]:
violations: List[Tuple[Candle, Candle]] = []

if len(candles) <= 1:
return violations

diff_ok = timedelta(seconds=candles[0].resolution.seconds)
for i, candle in enumerate(candles):
if i > 0:
msg = "Order violated from Candles"
new = candles[i - 1].started_at
old = candle.started_at
assert candles[i - 1].started_at - candle.started_at == timedelta(
seconds=res_sec
), f"{msg} {candle.id} ({old}) to {candles[i - 1].id} ({new})."
new = candles[i - 1]
old = candle
if (new.started_at - old.started_at) == diff_ok:
pass
else:
violations.append((old, new))
return violations

@classmethod
def amount_of_candles_in_range(
Expand Down
51 changes: 29 additions & 22 deletions locast/candle_fetcher/dydx/api_fetcher/dydx_v3_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,11 @@
datetime_to_dydx_iso_str,
)
from locast.candle_fetcher.dydx.api_fetcher.dydx_fetcher import DydxFetcher
from locast.candle_fetcher.exceptions import APIException


# TODO: Sort out Exception handling - maybe here an APIException might be fine - but it only should be raised when something goes wrong api wise - NOT when integrity errors or empty lists come back.
# OR no exception at all??
class DydxV3Fetcher(DydxFetcher):
def __init__(self, client: Client, rate_throttle_sec: float = 0.4) -> None:
self._exchange = Exchange.DYDX
Expand All @@ -36,26 +39,30 @@ async def fetch(
start_date: datetime,
end_date: datetime,
) -> List[Candle]:
# If only one candle is requested this prevents black magic in v3 backend
if cu.amount_of_candles_in_range(start_date, end_date, resolution) == 1:
start_date -= timedelta(seconds=1)

now = datetime.now()
time_since_last_request = (now - self._last_request_time).total_seconds()

if time_since_last_request < self._throttle_rate:
await asyncio.sleep(self._throttle_rate - time_since_last_request)

loop = asyncio.get_running_loop()
response: Dict[str, Any] = await loop.run_in_executor(
None,
lambda: self._client.public.get_candles( # type: ignore
market,
resolution.notation,
from_iso=datetime_to_dydx_iso_str(start_date),
to_iso=datetime_to_dydx_iso_str(end_date),
).data,
)

self._last_request_time = datetime.now()
try:
# If only one candle is requested this prevents black magic in v3 backend
if cu.amount_of_candles_in_range(start_date, end_date, resolution) == 1:
start_date -= timedelta(seconds=1)

now = datetime.now()
time_since_last_request = (now - self._last_request_time).total_seconds()

if time_since_last_request < self._throttle_rate:
await asyncio.sleep(self._throttle_rate - time_since_last_request)

loop = asyncio.get_running_loop()
response: Dict[str, Any] = await loop.run_in_executor(
None,
lambda: self._client.public.get_candles( # type: ignore
market,
resolution.notation,
from_iso=datetime_to_dydx_iso_str(start_date),
to_iso=datetime_to_dydx_iso_str(end_date),
).data,
)

self._last_request_time = datetime.now()
except Exception as e:
raise APIException(self._exchange, market, resolution, e)

return self._mapper.to_candles(response["candles"]) # type: ignore
27 changes: 17 additions & 10 deletions locast/candle_fetcher/dydx/api_fetcher/dydx_v4_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@
datetime_to_dydx_iso_str,
)
from locast.candle_fetcher.dydx.api_fetcher.dydx_fetcher import DydxFetcher
from locast.candle_fetcher.exceptions import APIException


# TODO: Sort out Exception handling - maybe here an APIException might be fine - but it only should be raised when something goes wrong api wise - NOT when integrity errors or empty lists come back.
# OR no exception at all??
class DydxV4Fetcher(DydxFetcher):
def __init__(
self,
Expand All @@ -37,14 +40,18 @@ async def fetch(
start_date: datetime,
end_date: datetime,
) -> List[Candle]:
response: Dict[
str,
Any,
] = await self._client.markets.get_perpetual_market_candles( # type: ignore
market=market,
resolution=resolution.notation,
from_iso=datetime_to_dydx_iso_str(start_date),
to_iso=datetime_to_dydx_iso_str(end_date),
)
assert response["candles"]
try:
response: Dict[
str,
Any,
] = await self._client.markets.get_perpetual_market_candles( # type: ignore
market=market,
resolution=resolution.notation,
from_iso=datetime_to_dydx_iso_str(start_date),
to_iso=datetime_to_dydx_iso_str(end_date),
)

except Exception as e:
raise APIException(self._exchange, market, resolution, e)

return self._mapper.to_candles(response["candles"])
56 changes: 29 additions & 27 deletions locast/candle_fetcher/dydx/candle_fetcher/dydx_candle_fetcher.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
from datetime import datetime
from typing import List
from typing import List, Tuple


from locast.candle.candle import Candle

from locast.candle.candle_utility import CandleUtility as cu
from locast.candle.exchange import Exchange
from locast.candle.resolution import ResolutionDetail
from locast.candle_fetcher.api_exception import APIException
from locast.candle_fetcher.candle_fetcher import CandleFetcher
from locast.candle_fetcher.dydx.api_fetcher.dydx_fetcher import DydxFetcher
from locast.logging import log_progress
from locast.candle_fetcher.exceptions import APIException
from locast.logging import log_integrity_violations, log_progress


class DydxCandleFetcher(CandleFetcher):
Expand Down Expand Up @@ -49,9 +49,9 @@ async def fetch_candles(
candles: List[Candle] = []

temp_end_date = end_date
iteration = 0

total, done = self._create_log_vars(resolution, start_date, end_date)
total = cu.amount_of_candles_in_range(start_date, end_date, resolution)
done = 0
violations: List[Tuple[Candle, Candle]] = []

try:
while (not candles) or candles[-1].started_at > start_date:
Expand All @@ -62,16 +62,32 @@ async def fetch_candles(
temp_end_date,
)

candles.extend(candle_batch)
temp_end_date = candles[-1].started_at
iteration += 1
if len(candle_batch) > 0:
for violation in cu.find_integrity_violations(candle_batch):
violations.append(violation)
total -= 1

if self._log_progress:
done += len(candle_batch)
log_progress("🚛", "candles", "fetched", done, total)

candles.extend(candle_batch)
temp_end_date = candles[-1].started_at

if self._log_progress:
done += len(candle_batch)
log_progress("🚛", "candles", "fetched", done, total)
else:
break

if len(violations) > 0:
log_integrity_violations(
"🚨",
self._exchange,
market,
resolution,
violations,
)

except Exception as e:
raise APIException(self._exchange, market, resolution, e) from e
raise APIException(self._exchange, market, resolution, e)

return candles

Expand Down Expand Up @@ -122,17 +138,3 @@ async def fetch_candles_up_to_now(
temp_now_minus_res = cu.subtract_n_resolutions(temp_norm_now, resolution, 1)

return candles

def _create_log_vars(
self,
resolution: ResolutionDetail,
start_date: datetime,
end_date: datetime,
):
if self._log_progress:
total = cu.amount_of_candles_in_range(start_date, end_date, resolution)
done = 0
else:
done = 0
total = 0
return total, done
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ def __init__(
market: str,
resolution: ResolutionDetail,
exception: Exception,
message: str | None = None,
) -> None:
if message is None:
message = f"{exchange.name}: Error fetching market data for market '{market}' and resolution '{resolution.notation}: {exception}'."
super().__init__(message)
msg = f"{exchange.name}: Error fetching market data for market '{market}' and resolution '{resolution.notation}: {exception}'."
super().__init__(msg)
22 changes: 22 additions & 0 deletions locast/logging.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
from typing import List, Tuple

from locast.candle.candle import Candle
from locast.candle.exchange import Exchange
from locast.candle.resolution import ResolutionDetail


def log_progress(
emoji: str,
group_name: str,
Expand All @@ -12,5 +19,20 @@ def log_progress(
print(f"{emoji} {progress_message}", end="\r", flush=True)


def log_integrity_violations(
emoji: str,
exchange: Exchange,
market: str,
resolution: ResolutionDetail,
violations: List[Tuple[Candle, Candle]],
) -> None:
n = len(violations)
v = "integrity violations" if n > 1 else "integrity violation"
detail = f"{market}, {resolution.notation}"
print(f"{emoji} Attention: {exchange.name} delivered {n} {v} for {detail} {emoji}")
for v in violations:
print(f" ❌ {v[0].started_at} - {v[1].started_at}")


def log_redundant_call(emoji: str, message: str) -> None:
print(f"{emoji} {message}")
13 changes: 8 additions & 5 deletions notebooks/store_manger_example.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
"source": [
"from datetime import timedelta\n",
"import time\n",
"\n",
"from dydx3 import Client # type: ignore\n",
"from dydx3.constants import API_HOST_MAINNET # type: ignore\n",
"from dydx_v4_client.indexer.rest.indexer_client import IndexerClient # type: ignore\n",
"from dydx_v4_client.network import make_mainnet # type: ignore\n",
"from locast.candle_fetcher.dydx.api_fetcher.dydx_v3_fetcher import DydxV3Fetcher\n",
"from sqlalchemy import create_engine # type: ignore\n",
"\n",
"\n",
Expand All @@ -40,14 +42,15 @@
" node_url=\"dydx-ops-rpc.kingnodes.com:443\",\n",
")\n",
"\n",
"DELETE_CLUSTER = False\n",
"DELETE_CLUSTER = True\n",
"\n",
"\n",
"# TODO: Duplicate file and do same stuff with v3 or pass store manager to example func?\n",
"async def dydx_v4_example() -> None:\n",
" # Create store manager.\n",
" engine = create_engine(\"sqlite:///locast.db\")\n",
" candle_storage = SqliteCandleStorage(engine, log_progress=True)\n",
"\n",
" dydx_v3_fetcher = DydxV3Fetcher(Client(host=API_HOST_MAINNET))\n",
" dydx_v4_fetcher = DydxV4Fetcher(IndexerClient(MAINNET.rest_indexer))\n",
" candle_fetcher = DydxCandleFetcher(dydx_v4_fetcher, log_progress=True)\n",
"\n",
Expand Down Expand Up @@ -79,7 +82,7 @@
" # If not, update cluster.\n",
" eth_info = await manager.get_cluster_info(exchange, market, resolution)\n",
" if not eth_info.is_uptodate:\n",
" print(\"🔄 Updating cluster...\")\n",
" print(\"\\n🔄 Updating cluster...\")\n",
" start = time.time()\n",
" await manager.update_cluster(exchange, market, resolution)\n",
" print(f\"✅ Cluster updated in {round(time.time() - start, 2)} seconds.\")\n",
Expand All @@ -92,7 +95,7 @@
"\n",
" # Set DELETE flag at the top to True, in order to try out deleting the eth candle cluster\n",
" if DELETE_CLUSTER:\n",
" print(\"🗑️ Deleting cluster...\")\n",
" print(\"\\n🗑️ Deleting cluster...\")\n",
" start = time.time()\n",
" await manager.delete_cluster(exchange, market, resolution)\n",
" print(f\"✅ Cluster deleted in {round(time.time() - start, 2)} seconds.\")\n",
Expand Down
19 changes: 12 additions & 7 deletions tests/candle/test_candle_utility.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,26 +49,31 @@ def test_is_newest_valid_candles_returns_false() -> None:
assert uc.is_newest_valid_candle(candle) is False


def test_assert_chronological_order_returns_true(
def test_find_integrity_violations_returns_empty(
dydx_v4_eth_one_min_mock_candles: List[Candle],
) -> None:
# given
candles = dydx_v4_eth_one_min_mock_candles

# when & then
uc.assert_chronologic_order(candles)
# when
violations = uc.find_integrity_violations(candles)

# then
assert len(violations) == 0

def test_assert_chronological_order_returns_false(

def test_find_integrity_violations_returns_violations(
dydx_v4_eth_one_min_mock_candles: List[Candle],
) -> None:
# given
candles = dydx_v4_eth_one_min_mock_candles
faulty_candles = [candles[0], candles[1], candles[3], candles[4]]

# when & then
with pytest.raises(AssertionError):
uc.assert_chronologic_order(faulty_candles)
# when
violations = uc.find_integrity_violations(faulty_candles)

# then
assert len(violations) > 0


def test_assert_candle_unity_returns_true(
Expand Down
Loading

0 comments on commit c7fab1b

Please sign in to comment.