Skip to content

Commit

Permalink
Implemented CachedTypedQueryProcessor (#372)
Browse files Browse the repository at this point in the history
  • Loading branch information
altvod authored Mar 7, 2024
1 parent 5644d9f commit b11a38f
Show file tree
Hide file tree
Showing 12 changed files with 240 additions and 70 deletions.
10 changes: 3 additions & 7 deletions lib/dl_api_lib/dl_api_lib/app/data_api/resources/typed_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@
)
from dl_api_lib.enums import USPermissionKind
from dl_api_lib.schemas.typed_query import (
DataRowsTypedQueryResultSchema,
PlainTypedQueryContentSchema,
RawTypedQuery,
RawTypedQueryParameter,
TypedQueryResultSchema,
TypedQuerySchema,
)
from dl_api_lib.service_registry.service_registry import ApiServiceRegistry
Expand All @@ -29,7 +29,6 @@
import dl_core.exc as core_exc
from dl_core.us_connection_base import ConnectionBase
from dl_dashsql.typed_query.primitives import (
DataRowsTypedQueryResult,
PlainTypedQuery,
TypedQuery,
TypedQueryParameter,
Expand Down Expand Up @@ -62,8 +61,7 @@ def load_typed_query(
parameters=tuple(
TypedQueryParameter(
name=param.name,
user_type=param.data_type,
value=param.value.value,
typed_value=param.value,
)
for param in parameters
),
Expand All @@ -75,11 +73,9 @@ class TypedQueryResultSerializer:
"""Serializes the result (meta and data)"""

def serialize_typed_query_result(self, typed_query_result: TypedQueryResult) -> Any:
# No other result types are supported in API:
assert isinstance(typed_query_result, DataRowsTypedQueryResult)
return {
"query_type": typed_query_result.query_type.name,
"data": DataRowsTypedQueryResultSchema().dump(typed_query_result),
"data": TypedQueryResultSchema().dump(typed_query_result),
}


Expand Down
2 changes: 1 addition & 1 deletion lib/dl_api_lib/dl_api_lib/schemas/typed_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class TypedQuerySchema(DefaultSchema[RawTypedQuery]):
parameters = ma_fields.List(ma_fields.Nested(TypedQueryParameterSchema), load_default=None)


class DataRowsTypedQueryResultSchema(BaseSchema):
class TypedQueryResultSchema(BaseSchema):
class ColumnHeaderSchema(BaseSchema):
name = ma_fields.String(required=True)
data_type = ma_fields.Enum(UserDataType, required=True, attribute="user_type")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,25 @@
from __future__ import annotations

import abc
from typing import TYPE_CHECKING
from typing import (
TYPE_CHECKING,
Optional,
)

import attr

from dl_core.data_processing.typed_query import CEBasedTypedQueryProcessor
from dl_core.us_connection_base import ConnectionBase
from dl_core.utils import FutureRef
from dl_dashsql.typed_query.processor.base import TypedQueryProcessorBase
from dl_dashsql.typed_query.processor.cache import (
CachedTypedQueryProcessor,
DefaultTypedQueryCacheKeyBuilder,
)


if TYPE_CHECKING:
from dl_cache_engine.engine import EntityCacheEngineAsync
from dl_core.services_registry.top_level import ServicesRegistry # noqa


Expand All @@ -24,13 +32,45 @@ def service_registry(self) -> ServicesRegistry:
return self._service_registry_ref.ref

@abc.abstractmethod
def get_typed_query_processor(self, connection: ConnectionBase) -> TypedQueryProcessorBase:
def get_typed_query_processor(
self,
connection: ConnectionBase,
allow_cache_usage: bool = True,
) -> TypedQueryProcessorBase:
raise NotImplementedError


class DefaultQueryProcessorFactory(TypedQueryProcessorFactory):
def get_typed_query_processor(self, connection: ConnectionBase) -> TypedQueryProcessorBase:
def get_typed_query_processor(
self,
connection: ConnectionBase,
allow_cache_usage: bool = True,
) -> TypedQueryProcessorBase:
ce_factory = self.service_registry.get_conn_executor_factory()
conn_executor = ce_factory.get_async_conn_executor(conn=connection)
tq_processor = CEBasedTypedQueryProcessor(async_conn_executor=conn_executor)
tq_processor: TypedQueryProcessorBase = CEBasedTypedQueryProcessor(async_conn_executor=conn_executor)

allow_cache_usage = allow_cache_usage and connection.allow_cache

use_cache: bool = False
cache_engine: Optional[EntityCacheEngineAsync] = None
if allow_cache_usage and connection.cache_ttl_sec_override: # (ttl is not None and > 0)
cache_engine_factory = self.service_registry.get_cache_engine_factory()
if cache_engine_factory is not None:
cache_engine = cache_engine_factory.get_cache_engine(entity_id=connection.uuid)
if cache_engine is not None:
use_cache = True

if use_cache:
assert cache_engine is not None
tq_processor = CachedTypedQueryProcessor(
main_processor=tq_processor,
cache_key_builder=DefaultTypedQueryCacheKeyBuilder(
base_key=connection.get_cache_key_part(),
),
cache_engine=cache_engine,
refresh_ttl_on_read=False,
cache_ttl_config=self.service_registry.default_cache_ttl_config,
)

return tq_processor
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class TestDashSQLTypedQuery(DefaultApiTestBase, DefaultDashSQLTypedQueryTestSuit

class TestDashSQLTypedQueryWithParameters(DefaultApiTestBase, DefaultDashSQLTypedQueryTestSuite):
raw_sql_level = RawSQLLevel.dashsql
data_caches_enabled = True

@pytest.fixture(scope="class")
def typed_query_info(self) -> TypedQueryInfo:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
)
from dl_dashsql.literalizer import DashSQLParamLiteralizer
from dl_dashsql.typed_query.primitives import (
DataRowsTypedQueryResult,
PlainTypedQuery,
TypedQuery,
TypedQueryResult,
Expand All @@ -50,7 +49,7 @@
@attr.s(frozen=True)
class AsyncTypedQueryAdapterActionEmptyDataRows(AsyncTypedQueryAdapterAction):
async def run_typed_query_action(self, typed_query: TypedQuery) -> TypedQueryResult:
return DataRowsTypedQueryResult(
return TypedQueryResult(
query_type=typed_query.query_type,
column_headers=(),
data_rows=(),
Expand All @@ -75,8 +74,8 @@ def _make_sa_query(self, typed_query: TypedQuery) -> ClauseElement:
formatter_incoming_parameters = [
QueryIncomingParameter(
original_name=param.name,
user_type=param.user_type,
value=param.value,
user_type=param.typed_value.type,
value=param.typed_value.value,
)
for param in typed_query.parameters
]
Expand Down Expand Up @@ -175,7 +174,7 @@ async def _make_result(
raw_cursor_info=dba_async_result.raw_cursor_info,
data=data,
)
result = DataRowsTypedQueryResult(
result = TypedQueryResult(
query_type=typed_query.query_type,
data_rows=data,
column_headers=column_headers,
Expand Down Expand Up @@ -219,7 +218,7 @@ def _make_result(self, typed_query: TypedQuery, dba_sync_result: DBAdapterQueryR
raw_cursor_info = dba_sync_result.raw_cursor_info
assert raw_cursor_info is not None
column_headers = self._resolve_result_column_headers(raw_cursor_info=raw_cursor_info)
result = DataRowsTypedQueryResult(
result = TypedQueryResult(
query_type=typed_query.query_type,
data_rows=data,
column_headers=column_headers,
Expand Down
2 changes: 0 additions & 2 deletions lib/dl_core_testing/dl_core_testing/testcases/typed_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
from dl_core.us_connection_base import ConnectionBase
from dl_core_testing.testcases.connection_executor import BaseConnectionExecutorTestClass
from dl_dashsql.typed_query.primitives import (
DataRowsTypedQueryResult,
PlainTypedQuery,
TypedQuery,
TypedQueryResult,
Expand Down Expand Up @@ -40,7 +39,6 @@ def get_typed_query(self) -> TypedQuery:
)

def check_typed_query_result(self, typed_query_result: TypedQueryResult) -> None:
assert isinstance(typed_query_result, DataRowsTypedQueryResult)
assert typed_query_result.data_rows[0] == (1, 2, "zxc")


Expand Down
4 changes: 2 additions & 2 deletions lib/dl_dashsql/dl_dashsql/formatting/shortcuts.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ def params_for_formatter(params: Sequence[TypedQueryParameter]) -> Sequence[Quer
return [
QueryIncomingParameter(
original_name=param.name,
user_type=param.user_type,
value=param.value,
user_type=param.typed_value.type,
value=param.typed_value.value,
)
for param in params
]
Expand Down
15 changes: 5 additions & 10 deletions lib/dl_dashsql/dl_dashsql/typed_query/primitives.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,13 @@
)
from dl_constants.types import TBIDataRow
import dl_dashsql.exc as exc
from dl_dashsql.types import IncomingDSQLParamTypeExt
from dl_model_tools.typed_values import BIValue


@attr.s(frozen=True, kw_only=True)
class TypedQueryParameter:
name: str = attr.ib()
user_type: UserDataType = attr.ib()
value: IncomingDSQLParamTypeExt = attr.ib()
typed_value: BIValue = attr.ib()


_PARAM_VALUE_TV = TypeVar("_PARAM_VALUE_TV")
Expand All @@ -42,7 +41,7 @@ def get_strict(self, name: str) -> TypedQueryParameter:
# typed methods
def get_typed_value(self, name: str, value_type: Type[_PARAM_VALUE_TV]) -> _PARAM_VALUE_TV:
try:
value = self.get_strict(name).value
value = self.get_strict(name).typed_value.value
if not isinstance(value, value_type):
raise exc.DashSQLParameterError(f"Parameter {name!r} has invalid type")
return value
Expand All @@ -68,18 +67,14 @@ class PlainTypedQuery(TypedQuery):
query: str = attr.ib()


@attr.s(frozen=True, kw_only=True)
class TypedQueryResult:
query_type: DashSQLQueryType = attr.ib()


@attr.s(frozen=True, kw_only=True)
class TypedQueryResultColumnHeader:
name: str = attr.ib()
user_type: UserDataType = attr.ib()


@attr.s(frozen=True, kw_only=True)
class DataRowsTypedQueryResult(TypedQueryResult):
class TypedQueryResult:
query_type: DashSQLQueryType = attr.ib()
column_headers: Sequence[TypedQueryResultColumnHeader] = attr.ib()
data_rows: Sequence[TBIDataRow] = attr.ib()
99 changes: 99 additions & 0 deletions lib/dl_dashsql/dl_dashsql/typed_query/processor/cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import abc
from typing import (
Iterable,
Optional,
Sequence,
)

import attr

from dl_cache_engine.engine import EntityCacheEngineAsync
from dl_cache_engine.primitives import (
BIQueryCacheOptions,
CacheTTLConfig,
LocalKeyRepresentation,
)
from dl_cache_engine.processing_helper import CacheProcessingHelper
from dl_constants.types import TJSONExt
from dl_dashsql.typed_query.primitives import (
TypedQuery,
TypedQueryResult,
)
from dl_dashsql.typed_query.processor.base import TypedQueryProcessorBase
from dl_dashsql.typed_query.query_serialization import get_typed_query_serializer
from dl_dashsql.typed_query.result_serialization import get_typed_query_result_serializer
from dl_utils.streaming import (
AsyncChunked,
AsyncChunkedBase,
)


class TypedQueryCacheKeyBuilderBase(abc.ABC):
@abc.abstractmethod
def get_cache_key(self, typed_query: TypedQuery) -> LocalKeyRepresentation:
raise NotImplementedError


@attr.s
class DefaultTypedQueryCacheKeyBuilder(TypedQueryCacheKeyBuilderBase):
base_key: LocalKeyRepresentation = attr.ib(kw_only=True)

def get_cache_key(self, typed_query: TypedQuery) -> LocalKeyRepresentation:
tq_serializer = get_typed_query_serializer(query_type=typed_query.query_type)
serialized_query = tq_serializer.serialize(typed_query)
local_key_rep = self.base_key.extend(part_type="typed_query_data", part_content=serialized_query)
return local_key_rep


@attr.s
class CachedTypedQueryProcessor(TypedQueryProcessorBase):
_main_processor: TypedQueryProcessorBase = attr.ib(kw_only=True)
_cache_engine: EntityCacheEngineAsync = attr.ib(kw_only=True)
_cache_ttl_config: CacheTTLConfig = attr.ib(kw_only=True)
_refresh_ttl_on_read: bool = attr.ib(kw_only=True)
_cache_key_builder: TypedQueryCacheKeyBuilderBase = attr.ib(kw_only=True)

def get_cache_options(self, typed_query: TypedQuery) -> BIQueryCacheOptions:
local_key_rep = self._cache_key_builder.get_cache_key(typed_query=typed_query)
cache_options = BIQueryCacheOptions(
cache_enabled=True,
ttl_sec=self._cache_ttl_config.ttl_sec_direct,
key=local_key_rep,
refresh_ttl_on_read=self._refresh_ttl_on_read,
)
return cache_options

async def process_typed_query(self, typed_query: TypedQuery) -> TypedQueryResult:
tq_result_serializer = get_typed_query_result_serializer(query_type=typed_query.query_type)

async def generate_func() -> Optional[AsyncChunkedBase[TJSONExt]]:
source_typed_query_result = await self._main_processor.process_typed_query(typed_query=typed_query)
# Pack everything into a single string value and wrap that into a stream
dumped_serialized_data = tq_result_serializer.serialize(source_typed_query_result)
# 1 chunk of length 1, containing a row with 1 item
chunked_data: Iterable[Sequence[list[TJSONExt]]] = [[[dumped_serialized_data]]]
return AsyncChunked.from_chunked_iterable(chunked_data)

cache_helper = CacheProcessingHelper(cache_engine=self._cache_engine)
cache_options = self.get_cache_options(typed_query=typed_query)
cache_situation, chunked_stream = await cache_helper.run_with_cache(
generate_func=generate_func,
cache_options=cache_options,
allow_cache_read=True,
use_locked_cache=False,
)

# TODO: Some logging? For instance, log `cache_situation`

assert chunked_stream is not None
data_rows = await chunked_stream.all()

# Extract the serialized data
assert len(data_rows) == 1
first_row = data_rows[0]
assert isinstance(first_row, (list, tuple)) and len(first_row) == 1
loaded_serialized_data = first_row[0]
assert isinstance(loaded_serialized_data, str)

typed_query_result = tq_result_serializer.deserialize(loaded_serialized_data)
return typed_query_result
Loading

0 comments on commit b11a38f

Please sign in to comment.