Skip to content

refactor: mv class and interface #246

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions BREAKING_CHANGES.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Breaking changes
## 0.2.0-beta60
- `MarketDataCache` moved into [tinkoff/invest/caching/market_data_cache/cache.py](tinkoff/invest/caching/market_data_cache/cache.py).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- `MarketDataCache` moved into [tinkoff/invest/caching/market_data_cache/cache.py](tinkoff/invest/caching/market_data_cache/cache.py).
- `MarketDataCache` was moved to [tinkoff/invest/caching/market_data_cache/cache.py](tinkoff/invest/caching/market_data_cache/cache.py).

- Correct import is now `from tinkoff.invest.caching.market_data_cache.cache import MarketDataCache` (whereas previously was `from tinkoff.invest.services import MarketDataCache`).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- Correct import is now `from tinkoff.invest.caching.market_data_cache.cache import MarketDataCache` (whereas previously was `from tinkoff.invest.services import MarketDataCache`).
- The correct import is now `from tinkoff.invest.caching.market_data_cache.cache import MarketDataCache` instead of `from tinkoff.invest.services import MarketDataCache`.

- Import in [download_all_candles.py](examples/download_all_candles.py) was also corrected.
2 changes: 1 addition & 1 deletion examples/download_all_candles.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
from pathlib import Path

from tinkoff.invest import CandleInterval, Client
from tinkoff.invest.caching.market_data_cache.cache import MarketDataCache
from tinkoff.invest.caching.market_data_cache.cache_settings import (
MarketDataCacheSettings,
)
from tinkoff.invest.services import MarketDataCache
from tinkoff.invest.utils import now

TOKEN = os.environ["INVEST_TOKEN"]
Expand Down
3 changes: 2 additions & 1 deletion tests/data_loaders/test_cached_load.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
HistoricCandle,
Quotation,
)
from tinkoff.invest.caching.market_data_cache.cache import MarketDataCache
from tinkoff.invest.caching.market_data_cache.cache_settings import (
FileMetaData,
MarketDataCacheSettings,
Expand All @@ -22,7 +23,7 @@
from tinkoff.invest.caching.market_data_cache.instrument_market_data_storage import (
InstrumentMarketDataStorage,
)
from tinkoff.invest.services import MarketDataCache, MarketDataService
from tinkoff.invest.services import MarketDataService
irusland marked this conversation as resolved.
Show resolved Hide resolved
from tinkoff.invest.utils import (
candle_interval_to_timedelta,
ceil_datetime,
Expand Down
143 changes: 143 additions & 0 deletions tinkoff/invest/caching/market_data_cache/cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
import logging
from datetime import datetime, timedelta
from typing import Dict, Generator, Iterable, Optional, Tuple

from tinkoff.invest import CandleInterval, HistoricCandle
from tinkoff.invest.caching.market_data_cache.cache_settings import (
MarketDataCacheSettings,
)
from tinkoff.invest.caching.market_data_cache.instrument_date_range_market_data import (
InstrumentDateRangeData,
)
from tinkoff.invest.caching.market_data_cache.instrument_market_data_storage import (
InstrumentMarketDataStorage,
)
from tinkoff.invest.candle_getter_interface import ICandleGetter
from tinkoff.invest.services import Services
from tinkoff.invest.utils import (
candle_interval_to_timedelta,
datetime_range_floor,
floor_datetime,
now,
with_filtering_distinct_candles,
)

logger = logging.getLogger(__name__)


class MarketDataCache(ICandleGetter):
def __init__(self, settings: MarketDataCacheSettings, services: Services):
self._settings = settings
self._settings.base_cache_dir.mkdir(parents=True, exist_ok=True)
self._services = services
self._figi_cache_storages: Dict[
Tuple[str, CandleInterval], InstrumentMarketDataStorage
] = {}

def _get_candles_from_net(
self, figi: str, interval: CandleInterval, from_: datetime, to: datetime
) -> Iterable[HistoricCandle]:
yield from self._services.get_all_candles(
figi=figi,
interval=interval,
from_=from_,
to=to,
)

def _with_saving_into_cache(
self,
storage: InstrumentMarketDataStorage,
from_net: Iterable[HistoricCandle],
net_range: Tuple[datetime, datetime],
interval_delta: timedelta,
) -> Iterable[HistoricCandle]:
candles = list(from_net)
if candles:
filtered_net_range = self._round_net_range(net_range, interval_delta)
filtered_candles = list(self._filter_complete_candles(candles))
storage.update(
[
InstrumentDateRangeData(
date_range=filtered_net_range, historic_candles=filtered_candles
)
]
)
logger.debug("From net [\n%s\n%s\n]", str(net_range[0]), str(net_range[1]))
logger.debug(
"Filtered net [\n%s\n%s\n]",
str(filtered_net_range[0]),
str(filtered_net_range[1]),
)
logger.debug(
"Filtered net real [\n%s\n%s\n]",
str(min(list(map(lambda x: x.time, filtered_candles)))), # noqa: C417
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

C417: Unnecessary map usage - rewrite using a generator expression/<list/set/dict> comprehension.

map(func, iterable) has great performance when func is a built-in function, and it makes sense if your function already has a name. But if your func is a lambda, it’s faster to use a generator expression or a comprehension, as it avoids the function call overhead. For example:

  • Rewrite map(lambda x: x + 1, iterable) to (x + 1 for x in iterable)
  • Rewrite map(lambda item: get_id(item), items) to (get_id(item) for item in items)
  • Rewrite list(map(lambda num: num * 2, nums)) to [num * 2 for num in nums]
  • Rewrite set(map(lambda num: num % 2 == 0, nums)) to {num % 2 == 0 for num in nums}
  • Rewrite dict(map(lambda v: (v, v ** 2), values)) to {v : v ** 2 for v in values}

str(max(list(map(lambda x: x.time, filtered_candles)))), # noqa: C417
)

yield from candles

def _filter_complete_candles(
self, candles: Iterable[HistoricCandle]
) -> Iterable[HistoricCandle]:
return filter(lambda candle: candle.is_complete, candles)

@with_filtering_distinct_candles # type: ignore
def get_all_candles(
self,
*,
from_: datetime,
to: Optional[datetime] = None,
interval: CandleInterval = CandleInterval(0),
figi: str = "",
) -> Generator[HistoricCandle, None, None]:
interval_delta = candle_interval_to_timedelta(interval)
to = to or now()
from_, to = datetime_range_floor((from_, to))
logger.debug("Request [\n%s\n%s\n]", str(from_), str(to))

processed_time = from_
figi_cache_storage = self._get_figi_cache_storage(figi=figi, interval=interval)
for cached in figi_cache_storage.get(request_range=(from_, to)):
cached_start, cached_end = cached.date_range
cached_candles = list(cached.historic_candles)
if cached_start > processed_time:
yield from self._with_saving_into_cache(
storage=figi_cache_storage,
from_net=self._get_candles_from_net(
figi, interval, processed_time, cached_start
),
net_range=(processed_time, cached_start),
interval_delta=interval_delta,
)
logger.debug(
"Returning from cache [\n%s\n%s\n]", str(cached_start), str(cached_end)
)

yield from cached_candles
processed_time = cached_end

if processed_time + interval_delta <= to:
yield from self._with_saving_into_cache(
storage=figi_cache_storage,
from_net=self._get_candles_from_net(figi, interval, processed_time, to),
net_range=(processed_time, to),
interval_delta=interval_delta,
)

def _get_figi_cache_storage(
self, figi: str, interval: CandleInterval
) -> InstrumentMarketDataStorage:
figi_tuple = (figi, interval)
storage = self._figi_cache_storages.get(figi_tuple)
if storage is None:
storage = InstrumentMarketDataStorage(
figi=figi, interval=interval, settings=self._settings
)
self._figi_cache_storages[figi_tuple] = storage
return storage # noqa:R504

def _round_net_range(
self, net_range: Tuple[datetime, datetime], interval_delta: timedelta
) -> Tuple[datetime, datetime]:
start, end = net_range
return start, floor_datetime(end, interval_delta)
18 changes: 18 additions & 0 deletions tinkoff/invest/candle_getter_interface.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import abc
from datetime import datetime
from typing import Generator, Optional


class ICandleGetter(abc.ABC):
@abc.abstractmethod
def get_all_candles(
self,
*,
from_: datetime,
to: Optional[datetime],
interval: "CandleInterval", # type: ignore # noqa: F821 Undefined name
figi: str,
) -> Generator[ # type: ignore
"HistoricCandle", None, None # noqa: F821 Undefined name
]:
pass
154 changes: 4 additions & 150 deletions tinkoff/invest/services.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,14 @@
# pylint:disable=redefined-builtin,too-many-lines
import abc
import logging
from datetime import datetime, timedelta
from typing import Dict, Generator, Iterable, List, Optional, Tuple
from datetime import datetime
from typing import Generator, Iterable, List, Optional

import grpc
from deprecation import deprecated

from . import _grpc_helpers
from ._errors import handle_request_error, handle_request_error_gen
from .caching.market_data_cache.cache_settings import MarketDataCacheSettings
from .caching.market_data_cache.instrument_date_range_market_data import (
InstrumentDateRangeData,
)
from .caching.market_data_cache.instrument_market_data_storage import (
InstrumentMarketDataStorage,
)
from .candle_getter_interface import ICandleGetter
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

давай это лучше как Protocol опишем и уберем явное наследование от абстрактного класса. Плюс убрать I, это вроде как плохая практика

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

в общем в этом модуле вообще не должно быть импортом из других мест

from .grpc import (
instruments_pb2,
instruments_pb2_grpc,
Expand Down Expand Up @@ -166,14 +159,7 @@
WithdrawLimitsResponse,
)
from .typedefs import AccountId
from .utils import (
candle_interval_to_timedelta,
datetime_range_floor,
floor_datetime,
get_intervals,
now,
with_filtering_distinct_candles,
)
from .utils import get_intervals, now

__all__ = (
"Services",
Expand All @@ -187,142 +173,10 @@
"UsersService",
"SandboxService",
"StopOrdersService",
"MarketDataCache",
)
logger = logging.getLogger(__name__)


class ICandleGetter(abc.ABC):
@abc.abstractmethod
def get_all_candles(
self,
*,
from_: datetime,
to: Optional[datetime],
interval: CandleInterval,
figi: str,
) -> Generator[HistoricCandle, None, None]:
pass


class MarketDataCache(ICandleGetter):
def __init__(self, settings: MarketDataCacheSettings, services: "Services"):
self._settings = settings
self._settings.base_cache_dir.mkdir(parents=True, exist_ok=True)
self._services = services
self._figi_cache_storages: Dict[
Tuple[str, CandleInterval], InstrumentMarketDataStorage
] = {}

def _get_candles_from_net(
self, figi: str, interval: CandleInterval, from_: datetime, to: datetime
) -> Iterable[HistoricCandle]:
yield from self._services.get_all_candles(
figi=figi,
interval=interval,
from_=from_,
to=to,
)

def _with_saving_into_cache(
self,
storage: InstrumentMarketDataStorage,
from_net: Iterable[HistoricCandle],
net_range: Tuple[datetime, datetime],
interval_delta: timedelta,
) -> Iterable[HistoricCandle]:
candles = list(from_net)
if candles:
filtered_net_range = self._round_net_range(net_range, interval_delta)
filtered_candles = list(self._filter_complete_candles(candles))
storage.update(
[
InstrumentDateRangeData(
date_range=filtered_net_range, historic_candles=filtered_candles
)
]
)
logger.debug("From net [\n%s\n%s\n]", str(net_range[0]), str(net_range[1]))
logger.debug(
"Filtered net [\n%s\n%s\n]",
str(filtered_net_range[0]),
str(filtered_net_range[1]),
)
logger.debug(
"Filtered net real [\n%s\n%s\n]",
str(min(list(map(lambda x: x.time, filtered_candles)))), # noqa: C417
str(max(list(map(lambda x: x.time, filtered_candles)))), # noqa: C417
)

yield from candles

def _filter_complete_candles(
self, candles: Iterable[HistoricCandle]
) -> Iterable[HistoricCandle]:
return filter(lambda candle: candle.is_complete, candles)

@with_filtering_distinct_candles # type: ignore
def get_all_candles(
self,
*,
from_: datetime,
to: Optional[datetime] = None,
interval: CandleInterval = CandleInterval(0),
figi: str = "",
) -> Generator[HistoricCandle, None, None]:
interval_delta = candle_interval_to_timedelta(interval)
to = to or now()
from_, to = datetime_range_floor((from_, to))
logger.debug("Request [\n%s\n%s\n]", str(from_), str(to))

processed_time = from_
figi_cache_storage = self._get_figi_cache_storage(figi=figi, interval=interval)
for cached in figi_cache_storage.get(request_range=(from_, to)):
cached_start, cached_end = cached.date_range
cached_candles = list(cached.historic_candles)
if cached_start > processed_time:
yield from self._with_saving_into_cache(
storage=figi_cache_storage,
from_net=self._get_candles_from_net(
figi, interval, processed_time, cached_start
),
net_range=(processed_time, cached_start),
interval_delta=interval_delta,
)
logger.debug(
"Returning from cache [\n%s\n%s\n]", str(cached_start), str(cached_end)
)

yield from cached_candles
processed_time = cached_end

if processed_time + interval_delta <= to:
yield from self._with_saving_into_cache(
storage=figi_cache_storage,
from_net=self._get_candles_from_net(figi, interval, processed_time, to),
net_range=(processed_time, to),
interval_delta=interval_delta,
)

def _get_figi_cache_storage(
self, figi: str, interval: CandleInterval
) -> InstrumentMarketDataStorage:
figi_tuple = (figi, interval)
storage = self._figi_cache_storages.get(figi_tuple)
if storage is None:
storage = InstrumentMarketDataStorage(
figi=figi, interval=interval, settings=self._settings
)
self._figi_cache_storages[figi_tuple] = storage
return storage # noqa:R504

def _round_net_range(
self, net_range: Tuple[datetime, datetime], interval_delta: timedelta
) -> Tuple[datetime, datetime]:
start, end = net_range
return start, floor_datetime(end, interval_delta)


class Services(ICandleGetter):
def __init__(
self,
Expand Down