Skip to content

Commit

Permalink
Removed some of the old cache logic
Browse files Browse the repository at this point in the history
  • Loading branch information
altvod committed Jan 23, 2024
1 parent 6c0706a commit 9ef85d8
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 178 deletions.
152 changes: 15 additions & 137 deletions lib/dl_core/dl_core/data_processing/cache/utils.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,21 @@
from __future__ import annotations

import abc
import logging
from typing import (
TYPE_CHECKING,
Collection,
Optional,
)

import attr
from sqlalchemy.exc import DatabaseError

from dl_constants.enums import DataSourceRole
from dl_core.data_processing.cache.exc import CachePreparationFailed
from dl_core.data_processing.cache.primitives import (
BIQueryCacheOptions,
CacheTTLConfig,
CacheTTLInfo,
DataKeyPart,
LocalKeyRepresentation,
)
from dl_core.query.bi_query import QueryAndResultInfo
from dl_core.serialization import hashable_dumps
from dl_core.us_connection_base import (
ConnectionBase,
Expand All @@ -28,24 +24,17 @@


if TYPE_CHECKING:
from sqlalchemy.engine.default import DefaultDialect
from sqlalchemy.sql import Select

from dl_constants.enums import UserDataType
from dl_constants.types import TJSONExt
from dl_core.data_processing.prepared_components.primitives import PreparedFromInfo
from dl_core.data_source.base import DataSource
from dl_core.us_dataset import Dataset
from dl_core.us_manager.local_cache import USEntryBuffer


LOGGER = logging.getLogger(__name__)


@attr.s
class CacheOptionsBuilderBase:
class CacheOptionsBuilderBase(abc.ABC):
default_ttl_config: CacheTTLConfig = attr.ib(factory=CacheTTLConfig)
_is_bleeding_edge_user: bool = attr.ib(default=False)

def get_actual_ttl_config(
self,
Expand All @@ -60,28 +49,6 @@ def get_actual_ttl_config(

return ctc

@staticmethod
def get_query_str_for_cache(query: Select, dialect: DefaultDialect) -> str:
try:
compiled_query = query.compile(dialect=dialect)
except DatabaseError as err:
raise CachePreparationFailed from err

if isinstance(compiled_query.params, dict):
ordered_params = sorted(
compiled_query.params.items(),
key=lambda item: item[0],
)
else:
ordered_params = compiled_query.params

return ";".join(
(
str(compiled_query),
str(ordered_params),
)
)

@staticmethod
def config_to_ttl_info(ttl_config: CacheTTLConfig) -> CacheTTLInfo:
return CacheTTLInfo(
Expand All @@ -99,133 +66,44 @@ def get_cache_ttl_info(self, data_source_list: Collection[DataSource]) -> CacheT
)
return self.config_to_ttl_info(ttl_config=ttl_config)

def get_data_key(
self,
*,
query_res_info: QueryAndResultInfo,
from_info: Optional[PreparedFromInfo] = None,
base_key: LocalKeyRepresentation = LocalKeyRepresentation(),
) -> Optional[LocalKeyRepresentation]:
return base_key


@attr.s
class DatasetOptionsBuilder(CacheOptionsBuilderBase):
cache_enabled: bool = attr.ib(kw_only=True, default=True)

def get_cache_options(
self,
joint_dsrc_info: PreparedFromInfo,
data_key: LocalKeyRepresentation,
) -> BIQueryCacheOptions:
@abc.abstractmethod
def get_cache_enabled(self, joint_dsrc_info: PreparedFromInfo) -> bool:
raise NotImplementedError


@attr.s
class CompengOptionsBuilder(DatasetOptionsBuilder): # TODO: Move to compeng package
cache_enabled: bool = attr.ib(kw_only=True, default=True)

def get_cache_options(
self,
joint_dsrc_info: PreparedFromInfo,
data_key: LocalKeyRepresentation,
) -> BIQueryCacheOptions:
ttl_info = self.get_cache_ttl_info(data_source_list=joint_dsrc_info.data_source_list)
cache_enabled = self.get_cache_enabled(joint_dsrc_info=joint_dsrc_info)
return BIQueryCacheOptions(
cache_enabled=self.cache_enabled,
key=data_key,
cache_enabled=cache_enabled,
key=data_key if cache_enabled else None,
ttl_sec=ttl_info.ttl_sec,
refresh_ttl_on_read=ttl_info.refresh_ttl_on_read,
)

def get_data_key(
self,
*,
query_res_info: QueryAndResultInfo,
from_info: Optional[PreparedFromInfo] = None,
base_key: LocalKeyRepresentation = LocalKeyRepresentation(),
) -> Optional[LocalKeyRepresentation]:
# TODO: Remove after switching to new cache keys
compiled_query = self.get_query_str_for_cache(
query=query_res_info.query,
dialect=from_info.query_compiler.dialect,
)
return base_key.extend(part_type="query", part_content=compiled_query)


@attr.s
class SelectorCacheOptionsBuilder(DatasetOptionsBuilder):
_is_bleeding_edge_user: bool = attr.ib(default=False)
_us_entry_buffer: USEntryBuffer = attr.ib(kw_only=True)
class CompengOptionsBuilder(DatasetOptionsBuilder): # TODO: Move to compeng package
cache_enabled: bool = attr.ib(kw_only=True, default=True)

def get_cache_enabled(self, joint_dsrc_info: PreparedFromInfo) -> bool:
return self.cache_enabled


@attr.s
class SelectorCacheOptionsBuilder(DatasetOptionsBuilder): # TODO: Rename to SourceDbCacheOptionsBuilder
def get_cache_enabled(self, joint_dsrc_info: PreparedFromInfo) -> bool:
assert joint_dsrc_info.data_source_list is not None
cache_enabled = all(dsrc.cache_enabled for dsrc in joint_dsrc_info.data_source_list)
return cache_enabled

def get_cache_options(
self,
joint_dsrc_info: PreparedFromInfo,
data_key: LocalKeyRepresentation,
) -> BIQueryCacheOptions:
"""Returns cache key, TTL for new entries, refresh TTL flag"""

ttl_info = self.get_cache_ttl_info(data_source_list=joint_dsrc_info.data_source_list)
cache_enabled = self.get_cache_enabled(joint_dsrc_info=joint_dsrc_info)
return BIQueryCacheOptions(
cache_enabled=cache_enabled,
key=data_key if cache_enabled else None,
ttl_sec=ttl_info.ttl_sec,
refresh_ttl_on_read=ttl_info.refresh_ttl_on_read,
)

def make_data_select_cache_key(
self,
from_info: PreparedFromInfo,
compiled_query: str,
user_types: list[UserDataType],
is_bleeding_edge_user: bool,
base_key: LocalKeyRepresentation = LocalKeyRepresentation(),
) -> LocalKeyRepresentation:
# TODO: Remove after switching to new cache keys,
# but put the db_name + target_connection.get_cache_key_part() parts somewhere
assert from_info.target_connection_ref is not None
target_connection = self._us_entry_buffer.get_entry(from_info.target_connection_ref)
assert isinstance(target_connection, ConnectionBase)
connection_id = target_connection.uuid
assert connection_id is not None

local_key_rep = base_key
local_key_rep = local_key_rep.extend(part_type="query", part_content=str(compiled_query))
local_key_rep = local_key_rep.extend(part_type="user_types", part_content=tuple(user_types or ()))
local_key_rep = local_key_rep.extend(
part_type="is_bleeding_edge_user",
part_content=is_bleeding_edge_user,
)

return local_key_rep

def get_data_key(
self,
*,
query_res_info: QueryAndResultInfo,
from_info: Optional[PreparedFromInfo] = None,
base_key: LocalKeyRepresentation = LocalKeyRepresentation(),
) -> Optional[LocalKeyRepresentation]:
# TODO: Remove after switching to new cache keys
compiled_query = self.get_query_str_for_cache(
query=query_res_info.query,
dialect=from_info.query_compiler.dialect,
)
data_key: Optional[LocalKeyRepresentation] = self.make_data_select_cache_key(
base_key=base_key,
from_info=from_info,
compiled_query=compiled_query,
user_types=query_res_info.user_types,
is_bleeding_edge_user=self._is_bleeding_edge_user,
)
return data_key


@attr.s
class DashSQLCacheOptionsBuilder(CacheOptionsBuilderBase):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,23 +143,6 @@ def _make_query_res_info(
)
return query_res_info

def get_data_key(
self,
*,
query: str | Select,
user_types: Sequence[UserDataType],
from_info: Optional[PreparedFromInfo] = None,
base_key: LocalKeyRepresentation = LocalKeyRepresentation(),
) -> Optional[LocalKeyRepresentation]:
# TODO: Remove this method
query_res_info = self._make_query_res_info(query=query, user_types=user_types)
data_key = self._cache_options_builder.get_data_key(
from_info=from_info,
query_res_info=query_res_info,
base_key=base_key,
)
return data_key

async def create_table(
self,
*,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,32 +173,12 @@ def make_data_key(self, op: BaseOp) -> LocalKeyRepresentation:
assert isinstance(op, CalcOp)
source_stream = self.ctx.get_stream(op.source_stream_id)

# TODO: Remove legacy version

# Legacy procedure
from_info = self.get_from_info_from_stream(source_stream=source_stream)
query_compiler = from_info.query_compiler
query = query_compiler.compile_select(
bi_query=op.bi_query,
# The info about the real source is already contained in the previous key parts,
# and, also, we want to avoid the randomized table names (in compeng) to appear in the key.
# So just use a fake table here.
sql_source=sa.table("table"),
)
legacy_data_key = self.db_ex_adapter.get_data_key(
query=query,
user_types=source_stream.user_types,
from_info=from_info,
base_key=source_stream.data_key,
)

# New procedure
new_data_key = source_stream.data_key.extend("query", op.data_key_data)

LOGGER.info(
f"Preliminary cache key info for query: "
f"legacy key: {legacy_data_key.key_parts_hash} ; "
f"new key: {new_data_key.key_parts_hash}"
f"key: {new_data_key.key_parts_hash}"
)

return new_data_key
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,11 @@ class SourceDbOperationProcessor(ExecutorBasedOperationProcessor):
_dataset: Dataset = attr.ib(kw_only=True)
_row_count_hard_limit: Optional[int] = attr.ib(kw_only=True, default=None)
_us_entry_buffer: USEntryBuffer = attr.ib(kw_only=True)
_is_bleeding_edge_user: bool = attr.ib(default=False)
_default_cache_ttl_config: CacheTTLConfig = attr.ib(default=None)

def _make_cache_options_builder(self) -> DatasetOptionsBuilder:
return SelectorCacheOptionsBuilder(
default_ttl_config=self._default_cache_ttl_config,
is_bleeding_edge_user=self._is_bleeding_edge_user,
us_entry_buffer=self._us_entry_buffer,
)

def _make_db_ex_adapter(self) -> Optional[ProcessorDbExecAdapterBase]:
Expand Down

0 comments on commit 9ef85d8

Please sign in to comment.