Skip to content

Commit

Permalink
Implement first pass of data horizon check when creating cluster.
Browse files Browse the repository at this point in the history
  • Loading branch information
sirEven committed Nov 5, 2024
1 parent 98992a6 commit c711b59
Show file tree
Hide file tree
Showing 12 changed files with 187 additions and 22 deletions.
5 changes: 5 additions & 0 deletions locast/candle/candle_utility.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,5 +190,10 @@ def missing_dates_between(
current_date += timedelta(seconds=resolution.seconds)
return missing_dates

@classmethod
def midpoint(cls, start: datetime, end: datetime) -> datetime:
# Calculate the midpoint between two datetime objects.
return start + (end - start) / 2


# TODO: Check what functions are not needed anymore.
6 changes: 6 additions & 0 deletions locast/candle_fetcher/candle_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,9 @@ async def fetch_candles_up_to_now(
resolution: ResolutionDetail,
start_date: datetime,
) -> List[Candle]: ...

async def find_horizon(
self,
market: str,
resolution: ResolutionDetail,
) -> datetime: ...
63 changes: 60 additions & 3 deletions locast/candle_fetcher/dydx/candle_fetcher/dydx_candle_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from locast.candle_fetcher.candle_fetcher import CandleFetcher
from locast.candle_fetcher.dydx.api_fetcher.dydx_fetcher import DydxFetcher
from locast.candle_fetcher.exceptions import APIException
from locast.logging import log_missing_candles, log_progress
from locast.logging_functions import log_missing_candles, log_progress


class DydxCandleFetcher(CandleFetcher):
Expand Down Expand Up @@ -55,7 +55,6 @@ async def fetch_candles(
candles to be created on the exchange, those candles will not be fetched.
"""
candles: List[Candle] = []

temp_end_date = end_date
total = cu.amount_of_candles_in_range(start_date, end_date, resolution)
done = 0
Expand Down Expand Up @@ -89,7 +88,7 @@ async def fetch_candles(
candles.extend(candle_batch)
temp_end_date = candles[-1].started_at

# TODO: See if ordering total_missing_candles makes sense.
# TODO: See if ordering total_missing_candles might make sense.
if total_missing_candles:
log_missing_candles(
"🚨",
Expand Down Expand Up @@ -152,6 +151,64 @@ async def fetch_candles_up_to_now(

return candles

# TODO: Missing candles can eff this up - but chances are tiny...
async def find_horizon(self, market: str, resolution: ResolutionDetail) -> datetime:
now = cu.normalized_now(resolution)
step = 1000 # TODO: Optimize this value (depending on resolution)
upper_bound = now
lower_bound = None

# Step 1: Exponential Back-Off Search
while True:
back_n = cu.subtract_n_resolutions(now, resolution, step)
back_n_plus_resolution = cu.add_one_resolution(back_n, resolution)

# Query the single candle over the range [back_n, back_n + resolution]
candle = await self._fetcher.fetch(
market,
resolution,
back_n,
back_n_plus_resolution,
)
# print(f"back_n: {back_n}")
# Candle exists, continue back-off
if candle:
upper_bound = back_n # Move the upper bound further back
step *= 2 # Double the step size
else:
lower_bound = back_n # We've gone too far, set this as the lower bound
break

# Step 2: Binary Search within the determined range
delta = upper_bound - lower_bound

while delta.total_seconds() > resolution.seconds:
mid_point = cu.norm_date(cu.midpoint(lower_bound, upper_bound), resolution)
mid_point_plus_resolution = cu.add_one_resolution(mid_point, resolution)

# Query for a single candle covering [mid_point, mid_point + resolution]
candle = await self._fetcher.fetch(
market, resolution, mid_point, mid_point_plus_resolution
)
# print(f"delta: {delta}")
# print(f"upper_bound: {upper_bound}")
# print(f"lower_bound: {lower_bound}")
if candle:
# Move the upper bound down (closer to the lower bound)
upper_bound = mid_point
else:
# Move the lower bound up (closer to the upper bound)
lower_bound = mid_point

delta = upper_bound - lower_bound
# Step 3: Final Candle Fetch to Confirm the Oldest Available Candle
upper_bound_plus_resolution = cu.add_one_resolution(upper_bound, resolution)
final_candle = await self._fetcher.fetch(
market, resolution, upper_bound, upper_bound_plus_resolution
)

return final_candle[0].started_at

def _detect_missing_in_batch(
self,
candle_batch: List[Candle],
Expand Down
2 changes: 1 addition & 1 deletion locast/candle_storage/sql/sqlite_candle_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
)

from locast.candle_storage.sql.table_utility import TableUtility as tu
from locast.logging import log_progress
from locast.logging_functions import log_progress


class SqliteCandleStorage(CandleStorage):
Expand Down
1 change: 1 addition & 0 deletions locast/logging.py → locast/logging_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from locast.candle.exchange_resolution import ResolutionDetail


# TODO: Also print market
def log_progress(
emoji: str,
group_name: str,
Expand Down
54 changes: 53 additions & 1 deletion locast/main.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,60 @@
import asyncio

from dydx_v4_client.indexer.rest.indexer_client import IndexerClient # type: ignore
from dydx_v4_client.network import make_mainnet # type: ignore
from locast.candle.dydx.dydx_resolution import DydxResolution
from locast.candle_fetcher.dydx.candle_fetcher.dydx_candle_fetcher import (
DydxCandleFetcher,
)
from locast.candle_fetcher.dydx.api_fetcher.dydx_v4_fetcher import DydxV4Fetcher
from locast.candle.candle_utility import CandleUtility as cu

MAINNET = make_mainnet( # type: ignore
rest_indexer="https://indexer.dydx.trade/",
websocket_indexer="wss://indexer.dydx.trade/v4/ws",
node_url="dydx-ops-rpc.kingnodes.com:443",
)


async def main() -> None:
pass
res = DydxResolution.FIFTEEN_MINUTES
market = "DOGE-USD"
dydx_v4_fetcher = DydxV4Fetcher(IndexerClient(MAINNET.rest_indexer))
fetcher = DydxCandleFetcher(dydx_v4_fetcher)

horizon = await fetcher.find_horizon(market, res)
print(f"horizon: {horizon}")

horizon_plus_one = cu.add_one_resolution(horizon, res)
horizon_minus_one = cu.subtract_n_resolutions(horizon, res, 1)
horizon_minus_two = cu.subtract_n_resolutions(horizon, res, 2)
oldest_candle = await fetcher.fetch_candles(
market,
res,
horizon,
horizon_plus_one,
)

one_older = await fetcher.fetch_candles(
market,
res,
horizon_minus_one,
horizon,
)

two_older = await fetcher.fetch_candles(
market,
res,
horizon_minus_two,
horizon_minus_one,
)

print("Oldest:")
print(oldest_candle)
print("One older:")
print(one_older)
print("Two older:")
print(two_older)


asyncio.run(main())
30 changes: 28 additions & 2 deletions locast/store_manager/store_manager.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from datetime import datetime
from typing import List
from typing import Dict, List

from locast.candle.candle_utility import CandleUtility as cu
from locast.candle.candle import Candle
Expand All @@ -8,7 +8,12 @@
from locast.candle_fetcher.candle_fetcher import CandleFetcher
from locast.candle_storage.cluster_info import ClusterInfo
from locast.candle_storage.candle_storage import CandleStorage
from locast.logging import log_redundant_call
from locast.logging_functions import log_redundant_call

# TODO: Here we could call find_horizon, correct the start date accordingly and right at the beginning print an according message.
# If horizon <= start_date, all is fine, no correction necessary. Else, user is over fetching into the past, hence we replace start_date with horizon.
# NOTE: StoreManager might be a valid place to for this, as it allows clean caching of determined horizons, providing them for its life time. Then we could
# add find_horizon to the CandleFetcher Protocol. It would also only ever need to be called and cached at the beginning of create_cluster()


class StoreManager:
Expand All @@ -20,6 +25,8 @@ def __init__(
self._candle_fetcher = candle_fetcher
self._candle_storage = candle_storage

self._horizon_cache: Dict[str, datetime] = {}

async def create_cluster(
self,
market: str,
Expand All @@ -46,6 +53,8 @@ async def create_cluster(
resolution,
)

start_date = await self._check_horizon(market, resolution, start_date)

cluster = await self._candle_fetcher.fetch_candles_up_to_now(
market,
resolution,
Expand Down Expand Up @@ -126,6 +135,23 @@ async def get_cluster_info(

# TODO: Expand API by adding convenience methods such as def retrieve_segment(from, to, exchange, market, resolution) -> List[Candle]: ...

async def _check_horizon(
self,
market: str,
resolution: ResolutionDetail,
start_date: datetime,
):
if not (horizon := self._horizon_cache.get(f"{market}_{resolution.notation}")):
horizon = await self._candle_fetcher.find_horizon(market, resolution)
self._horizon_cache[f"{market}_{resolution.notation}"] = horizon

if start_date < horizon:
ex = self._candle_fetcher.exchange.name
print(f"Attention: Candles on {ex} only reach back to {horizon}.")
print(f"Start date ({start_date}) will be moved, to prevent conflicts.")
start_date = horizon
return start_date


class ExistingClusterException(Exception):
def __init__(self, message: str) -> None:
Expand Down
20 changes: 10 additions & 10 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 20 additions & 4 deletions tests/helper/candle_mockery/dydx_candle_backend_mock.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
mock_dydx_candle_dict_batch,
)

from sir_utilities.date_time import string_to_datetime


class DydxCandleBackendMock(CandleBackendMock):
_instance = None
Expand All @@ -24,22 +26,28 @@ def __init__(
missing_random_candles: int = 0,
missing_candles_on_batch_newest_edge: int = 0,
) -> None:
properties = {
if hasattr(self, "_initialized"):
return
deletion_properties = {
"_missing_random_candles": missing_random_candles,
"_missing_candles_on_batch_newest_edge": missing_candles_on_batch_newest_edge,
}
try:
for prop, value in properties.items():
for prop, value in deletion_properties.items():
if not hasattr(self, prop):
setattr(self, prop, value)
# Initialize corresponding deleted flag
deleted_prop = f"{prop}_deleted"
setattr(self, deleted_prop, False)

except AttributeError as e:
print(f"Error setting attribute: {e}")
except Exception as e:
print(f"Unexpected error: {e}")

self._horizon = string_to_datetime("2024-01-01T00:00:00.000Z")
self._initialized = True

@property
def missing_random_candles(self) -> int:
return self._missing_random_candles
Expand Down Expand Up @@ -78,7 +86,10 @@ def mock_candles(
self._delete_random_candles(candle_dicts_batch)

if self._missing_candles_on_batch_newest_edge:
self._delete_on_batch_batch_newest_edge(candle_dicts_batch)
self._delete_on_batch_newest_edge(candle_dicts_batch)

if self._horizon_reached(candle_dicts_batch):
return {"candles": []}

return {"candles": candle_dicts_batch}

Expand All @@ -92,8 +103,13 @@ def _delete_random_candles(self, batch: List[Dict[str, Any]]) -> None:
del batch[random.randint(10, 20)]
self._missing_random_candles_deleted = True

def _delete_on_batch_batch_newest_edge(self, batch: List[Dict[str, Any]]):
def _delete_on_batch_newest_edge(self, batch: List[Dict[str, Any]]):
if not self._missing_candles_on_batch_newest_edge_deleted:
for _ in range(self._missing_candles_on_batch_newest_edge):
del batch[0]
self._missing_candles_on_batch_newest_edge_deleted = True

def _horizon_reached(self, batch: List[Dict[str, Any]]) -> bool:
return any(
string_to_datetime(candle["startedAt"]) < self._horizon for candle in batch
)
1 change: 1 addition & 0 deletions tests/helper/candle_mockery/v4_indexer_mock.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from tests.helper.candle_mockery.dydx_candle_backend_mock import DydxCandleBackendMock


# TODO: Implement horizon - meaning: After n returns, return []
class V4MarketsClientMock(MarketsClient):
def __init__(self) -> None:
pass
Expand Down
2 changes: 1 addition & 1 deletion tests/logging/test_logging.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from typing import List
import pytest
from locast.logging import log_progress
from locast.logging_functions import log_progress


def test_log_progress_prints_correctly(
Expand Down
Loading

0 comments on commit c711b59

Please sign in to comment.