diff --git a/pyatlan/client/search_log.py b/pyatlan/client/search_log.py index c92085df..41e039ea 100644 --- a/pyatlan/client/search_log.py +++ b/pyatlan/client/search_log.py @@ -1,3 +1,4 @@ +import logging from typing import List, Union from pydantic.v1 import ValidationError, parse_obj_as, validate_arguments @@ -5,6 +6,7 @@ from pyatlan.client.common import ApiCaller from pyatlan.client.constants import SEARCH_LOG from pyatlan.errors import ErrorCode +from pyatlan.model.search import SortItem from pyatlan.model.search_log import ( AssetViews, SearchLogEntry, @@ -16,6 +18,7 @@ UNIQUE_USERS = "uniqueUsers" UNIQUE_ASSETS = "uniqueAssets" +LOGGER = logging.getLogger(__name__) class SearchLogClient: @@ -67,27 +70,69 @@ def _call_search_api(self, criteria: SearchLogRequest) -> dict: :param criteria: An instance of SearchLogRequest detailing the search query, parameters, etc. :return: A dictionary representing the raw JSON response from the search API. """ - return self._client._call_api( - SEARCH_LOG, - request_obj=criteria, + return self._client._call_api(SEARCH_LOG, request_obj=criteria) + + @staticmethod + def _prepare_sorts_for_sl_bulk_search( + sorts: List[SortItem], + ) -> List[SortItem]: + """ + Ensures that sorting by creation timestamp is prioritized for search log bulk searches. + + :param sorts: list of existing sorting options. + :returns: a modified list of sorting options with creation timestamp as the top priority. + """ + if not SearchLogResults.presorted_by_timestamp(sorts): + return SearchLogResults.sort_by_timestamp_first(sorts) + return sorts + + def _get_bulk_search_log_message(self, bulk): + return ( + ( + "Search log bulk search option is enabled. " + if bulk + else "Result size (%s) exceeds threshold (%s). " + ) + + "Ignoring requests for offset-based paging and using timestamp-based paging instead." ) @validate_arguments def search( - self, criteria: SearchLogRequest + self, criteria: SearchLogRequest, bulk=False ) -> Union[SearchLogViewResults, SearchLogResults]: """ - Search for assets using the provided criteria. + Search for search logs using the provided criteria. + `Note:` if the number of results exceeds the predefined threshold + (10,000 search logs) this will be automatically converted into an search log `bulk` search. :param criteria: detailing the search query, parameters, and so on to run - :returns: the results of the search + :param bulk: whether to run the search to retrieve search logs that match the supplied criteria, + for large numbers of results (> `10,000`), defaults to `False`. Note: this will reorder the results + (based on creation timestamp) in order to iterate through a large number (more than `10,000`) results. + :raises InvalidRequestError: + + - if search log bulk search is enabled (`bulk=True`) and any + user-specified sorting options are found in the search request. + - if search log bulk search is disabled (`bulk=False`) and the number of results + exceeds the predefined threshold (i.e: `10,000` assets) + and any user-specified sorting options are found in the search request. + :raises AtlanError: on any API communication issue + :returns: the results of the search """ + if bulk: + if criteria.dsl.sort and len(criteria.dsl.sort) > 2: + raise ErrorCode.UNABLE_TO_RUN_SEARCH_LOG_BULK_WITH_SORTS.exception_with_parameters() + criteria.dsl.sort = self._prepare_sorts_for_sl_bulk_search( + criteria.dsl.sort + ) + LOGGER.debug(self._get_bulk_search_log_message(bulk)) user_views = [] asset_views = [] log_entries = [] raw_json = self._call_search_api(criteria) count = raw_json.get("approximateCount", 0) + if "aggregations" in raw_json and UNIQUE_USERS in raw_json.get( "aggregations", {} ): @@ -140,6 +185,21 @@ def search( raise ErrorCode.JSON_ERROR.exception_with_parameters( raw_json, 200, str(err) ) from err + if ( + count > SearchLogResults._MASS_EXTRACT_THRESHOLD + and not SearchLogResults.presorted_by_timestamp(criteria.dsl.sort) + ): + if criteria.dsl.sort and len(criteria.dsl.sort) > 2: + raise ErrorCode.UNABLE_TO_RUN_SEARCH_LOG_BULK_WITH_SORTS.exception_with_parameters() + criteria.dsl.sort = self._prepare_sorts_for_sl_bulk_search( + criteria.dsl.sort + ) + LOGGER.debug( + self._get_bulk_search_log_message(bulk), + count, + SearchLogResults._MASS_EXTRACT_THRESHOLD, + ) + return self.search(criteria) return SearchLogResults( client=self._client, criteria=criteria, @@ -148,4 +208,5 @@ def search( count=count, log_entries=log_entries, aggregations={}, + bulk=bulk, ) diff --git a/pyatlan/errors.py b/pyatlan/errors.py index 0fa8e110..994b2d8b 100644 --- a/pyatlan/errors.py +++ b/pyatlan/errors.py @@ -579,6 +579,14 @@ class ErrorCode(Enum): + "audit search request when performing a bulk search.", InvalidRequestError, ) + UNABLE_TO_RUN_SEARCH_LOG_BULK_WITH_SORTS = ( + 400, + "ATLAN-PYTHON-400-067", + "Unable to execute search log bulk search with user-defined sorting options.", + "Please ensure that no sorting options are included in your " + + "search log search request when performing a bulk search.", + InvalidRequestError, + ) MISSING_NAME = ( 400, "ATLAN-PYTHON-400-067", diff --git a/pyatlan/model/search_log.py b/pyatlan/model/search_log.py index c3ad6db1..a9979e6b 100644 --- a/pyatlan/model/search_log.py +++ b/pyatlan/model/search_log.py @@ -1,7 +1,7 @@ from __future__ import annotations from datetime import datetime -from typing import Any, Dict, Generator, Iterable, List, Optional +from typing import Any, Dict, Generator, Iterable, List, Optional, Set from pydantic.v1 import Field, ValidationError, parse_obj_as @@ -9,12 +9,14 @@ from pyatlan.client.constants import SEARCH_LOG from pyatlan.errors import ErrorCode from pyatlan.model.aggregation import Aggregation +from pyatlan.model.assets import Asset from pyatlan.model.core import AtlanObject from pyatlan.model.enums import UTMTags from pyatlan.model.search import ( DSL, Bool, Query, + Range, SearchRequest, SortItem, SortOrder, @@ -22,6 +24,8 @@ Terms, ) +BY_TIMESTAMP = [SortItem("timestamp", order=SortOrder.ASCENDING)] + class SearchLogRequest(SearchRequest): """Class from which to configure a search against Atlan's search log.""" @@ -65,7 +69,6 @@ def _get_view_dsl_kwargs( sort = sort or [] query_filter = query_filter or [] exclude_users = exclude_users or [] - BY_TIMESTAMP = [SortItem("timestamp", order=SortOrder.ASCENDING)] return dict( size=size, from_=from_, @@ -362,6 +365,9 @@ def asset_views(self) -> Optional[List[AssetViews]]: class SearchLogResults(Iterable): """Captures the response from a search against Atlan's recent search logs.""" + _DEFAULT_SIZE = DSL.__fields__.get("size").default or 300 # type: ignore[union-attr] + _MASS_EXTRACT_THRESHOLD = 10000 - _DEFAULT_SIZE + def __init__( self, client: ApiCaller, @@ -371,15 +377,21 @@ def __init__( count: int, log_entries: List[SearchLogEntry], aggregations: Dict[str, Aggregation], + bulk: bool = False, ): self._client = client self._endpoint = SEARCH_LOG self._criteria = criteria self._start = start self._size = size - self._count = count self._log_entries = log_entries + self._count = count + self._approximate_count = count self._aggregations = aggregations + self._bulk = bulk + self._first_record_creation_time = -2 + self._last_record_creation_time = -2 + self._processed_log_entries: Set[str] = set() @property def count(self) -> int: @@ -393,6 +405,16 @@ def current_page(self) -> List[SearchLogEntry]: """ return self._log_entries + def _get_sl_unique_key(self, entity: SearchLogEntry) -> str: + """ + Returns a unique key for a `SearchLogEntry` + by combining the `entity_guids_all[0]` with the timestamp. + + NOTE: This is necessary because the search log API + does not provide a unique identifier for logs + """ + return f"{entity.entity_guids_all[0]}:{entity.timestamp}" + def next_page(self, start=None, size=None) -> bool: """ Indicates whether there is a next page of results. @@ -400,8 +422,21 @@ def next_page(self, start=None, size=None) -> bool: :returns: True if there is a next page of results, otherwise False """ self._start = start or self._start + self._size + is_bulk_search = ( + self._bulk or self._approximate_count > self._MASS_EXTRACT_THRESHOLD + ) if size: self._size = size + + if is_bulk_search: + # Used in the "timestamp-based" paging approach + # to check if search log with the unique key "_get_sl_unique_key()" + # has already been processed in a previous page of results. + # If it has, then exclude it from the current results; + # otherwise, we may encounter duplicate search log records. + self._processed_log_entries.update( + self._get_sl_unique_key(entity) for entity in self._log_entries + ) return self._get_next_page() if self._log_entries else False def _get_next_page(self): @@ -410,14 +445,21 @@ def _get_next_page(self): :returns: True if the next page of results was fetched, False if there was no next page """ + query = self._criteria.dsl.query self._criteria.dsl.from_ = self._start self._criteria.dsl.size = self._size - if raw_json := self._get_next_page_json(): + is_bulk_search = ( + self._bulk or self._approximate_count > self._MASS_EXTRACT_THRESHOLD + ) + if is_bulk_search: + self._prepare_query_for_timestamp_paging(query) + + if raw_json := self._get_next_page_json(is_bulk_search): self._count = raw_json.get("approximateCount", 0) return True return False - def _get_next_page_json(self): + def _get_next_page_json(self, is_bulk_search: bool = False): """ Fetches the next page of results and returns the raw JSON of the retrieval. @@ -427,17 +469,138 @@ def _get_next_page_json(self): self._endpoint, request_obj=self._criteria, ) + if "logs" not in raw_json or not raw_json["logs"]: self._log_entries = [] return None try: self._log_entries = parse_obj_as(List[SearchLogEntry], raw_json["logs"]) + if is_bulk_search: + self._update_first_last_record_creation_times() + self._filter_processed_entities() return raw_json except ValidationError as err: raise ErrorCode.JSON_ERROR.exception_with_parameters( raw_json, 200, str(err) ) from err + def _filter_processed_entities(self): + """ + Removes entities that have already been processed to avoid duplicates. + """ + self._log_entries = [ + entity + for entity in self._log_entries + if self._get_sl_unique_key(entity) not in self._processed_log_entries + ] + + def _prepare_query_for_timestamp_paging(self, query: Query): + """ + Adjusts the query to include timestamp filters for search log bulk extraction. + """ + rewritten_filters = [] + if isinstance(query, Bool): + for filter_ in query.filter: + if self._is_paging_timestamp_query(filter_): + continue + rewritten_filters.append(filter_) + + if self._first_record_creation_time != self._last_record_creation_time: + rewritten_filters.append( + self._get_paging_timestamp_query(self._last_record_creation_time) + ) + if isinstance(query, Bool): + rewritten_query = Bool( + filter=rewritten_filters, + must=query.must, + must_not=query.must_not, + should=query.should, + boost=query.boost, + minimum_should_match=query.minimum_should_match, + ) + else: + # If a Term, Range, etc query type is found + # in the DSL, append it to the Bool `filter`. + rewritten_filters.append(query) + rewritten_query = Bool(filter=rewritten_filters) + self._criteria.dsl.from_ = 0 + self._criteria.dsl.query = rewritten_query + else: + # Ensure that when switching to offset-based paging, if the first and last record timestamps are the same, + # we do not include a created timestamp filter (ie: Range(field='__timestamp', gte=VALUE)) in the query. + # Instead, ensure the search runs with only SortItem(field='__timestamp', order=). + # Failing to do so can lead to incomplete results (less than the approximate count) when running the search + # with a small page size. + if isinstance(query, Bool): + for filter_ in query.filter: + if self._is_paging_timestamp_query(filter_): + query.filter.remove(filter_) + + # Always ensure that the offset is set to the length of the processed assets + # instead of the default (start + size), as the default may skip some assets + # and result in incomplete results (less than the approximate count) + self._criteria.dsl.from_ = len(self._processed_log_entries) + + @staticmethod + def _get_paging_timestamp_query(last_timestamp: int) -> Query: + return Range(field="createdAt", gte=last_timestamp) + + @staticmethod + def _is_paging_timestamp_query(filter_: Query) -> bool: + return ( + isinstance(filter_, Range) + and filter_.field == "createdAt" + and filter_.gte is not None + ) + + def _update_first_last_record_creation_times(self): + if self._log_entries and len(self._log_entries) > 1: + self._first_record_creation_time = self._log_entries[0].created_at + self._last_record_creation_time = self._log_entries[-1].created_at + + @staticmethod + def presorted_by_timestamp(sorts: Optional[List[SortItem]]) -> bool: + """ + Checks if the sorting options prioritize creation time in ascending order. + :param sorts: list of sorting options or None. + :returns: True if sorting is already prioritized by creation time, False otherwise. + """ + if sorts and isinstance(sorts[0], SortItem): + return ( + sorts[0].field == "createdAt" and sorts[0].order == SortOrder.ASCENDING + ) + return False + + @staticmethod + def sort_by_timestamp_first(sorts: List[SortItem]) -> List[SortItem]: + """ + Rewrites the sorting options to ensure that + sorting by creation time, ascending, is the top + priority. Adds this condition if it does not + already exist, or moves it up to the top sorting + priority if it does already exist in the list. + + :param sorts: list of sorting options + :returns: sorting options, making sorting by + creation time in ascending order the top priority + """ + creation_asc_sort = [SortItem("createdAt", order=SortOrder.ASCENDING)] + if not sorts: + return creation_asc_sort + + rewritten_sorts = [ + sort + for sort in sorts + # Added a condition to disable "timestamp" sorting when bulk search for logs is enabled, + # as sorting is already handled based on "createdAt" in this case. + if ( + (not sort.field) + or (sort.field != Asset.CREATE_TIME.internal_field_name) + ) + and (sort not in BY_TIMESTAMP) + ] + return creation_asc_sort + rewritten_sorts + def __iter__(self) -> Generator[SearchLogEntry, None, None]: """ Iterates through the results, lazily-fetching each next page until there diff --git a/tests/integration/test_client.py b/tests/integration/test_client.py index 62dd5b86..209e674b 100644 --- a/tests/integration/test_client.py +++ b/tests/integration/test_client.py @@ -7,7 +7,8 @@ from pydantic.v1 import StrictStr from pyatlan.client.atlan import DEFAULT_RETRY, AtlanClient -from pyatlan.client.audit import LOGGER +from pyatlan.client.audit import LOGGER as AUDIT_LOGGER +from pyatlan.client.search_log import LOGGER as SEARCH_LOG_LOGGER from pyatlan.client.search_log import ( AssetViews, SearchLogRequest, @@ -634,7 +635,7 @@ def _assert_audit_search_results(results, expected_sorts, size, total_count): @pytest.mark.order(after="test_audit_find_by_user") -@patch.object(LOGGER, "debug") +@patch.object(AUDIT_LOGGER, "debug") def test_audit_search_pagination( mock_logger, audit_info: AuditInfo, client: AtlanClient ): @@ -946,6 +947,119 @@ def test_search_log_views_by_guid( assert len(response.current_page()) == 0 +@pytest.fixture(scope="module") +def generate_search_logs(client: AtlanClient, sl_glossary: AtlasGlossary): + log_count = 5 + + for _ in range(log_count): + _view_test_glossary_by_search(client, sl_glossary) + time.sleep(1) + + request = SearchLogRequest.views_by_guid(guid=sl_glossary.guid, size=20) + response = client.search_log.search(request) + assert ( + response.count >= log_count + ), f"Expected at least {log_count} logs, but got {response.count}." + + +def _assert_search_log_results( + results, expected_sorts, size, TOTAL_LOG_ENTRIES, bulk=False +): + assert results.count > size + assert len(results.current_page()) == size + counter = 0 + for log in results: + assert log + counter += 1 + assert counter == TOTAL_LOG_ENTRIES + assert results + assert results._bulk is bulk + assert results._criteria.dsl.sort == expected_sorts + + +@patch.object(SEARCH_LOG_LOGGER, "debug") +def test_search_log_pagination( + mock_logger, generate_search_logs, sl_glossary: AtlasGlossary, client: AtlanClient +): + size = 2 + # Test search logs by GUID with default offset-based pagination + search_log_request = SearchLogRequest.views_by_guid( + guid=sl_glossary.guid, + size=size, + exclude_users=[], + ) + + results = client.search_log.search(criteria=search_log_request, bulk=False) + TOTAL_LOG_ENTRIES = results.count + + expected_sorts = [ + SortItem(field="timestamp", order=SortOrder.ASCENDING), + SortItem(field="entityGuidsAll", order=SortOrder.ASCENDING), + ] + _assert_search_log_results(results, expected_sorts, size, TOTAL_LOG_ENTRIES) + + # Test search logs by GUID with `bulk` option using timestamp-based pagination + search_log_request = SearchLogRequest.views_by_guid( + guid=sl_glossary.guid, + size=size, + exclude_users=[], + ) + results = client.search_log.search(criteria=search_log_request, bulk=True) + expected_sorts = [ + SortItem(field="createdAt", order=SortOrder.ASCENDING), + SortItem(field="entityGuidsAll", order=SortOrder.ASCENDING), + ] + _assert_search_log_results(results, expected_sorts, size, TOTAL_LOG_ENTRIES, True) + assert mock_logger.call_count == 1 + assert ( + "Search log bulk search option is enabled." + in mock_logger.call_args_list[0][0][0] + ) + mock_logger.reset_mock() + + # When the number of results exceeds the predefined threshold and bulk=True + with patch.object(SearchLogResults, "_MASS_EXTRACT_THRESHOLD", -1): + search_log_request = SearchLogRequest.views_by_guid( + guid=sl_glossary.guid, + size=size, + exclude_users=[], + ) + results = client.search_log.search(criteria=search_log_request, bulk=True) + expected_sorts = [ + SortItem(field="createdAt", order=SortOrder.ASCENDING), + SortItem(field="entityGuidsAll", order=SortOrder.ASCENDING), + ] + _assert_search_log_results( + results, expected_sorts, size, TOTAL_LOG_ENTRIES, True + ) + assert mock_logger.call_count < TOTAL_LOG_ENTRIES + assert ( + "Search log bulk search option is enabled." + in mock_logger.call_args_list[0][0][0] + ) + mock_logger.reset_mock() + + # When results exceed threshold and bulk=False, SDK auto-switches to bulk search + with patch.object(SearchLogResults, "_MASS_EXTRACT_THRESHOLD", -1): + search_log_request = SearchLogRequest.views_by_guid( + guid=sl_glossary.guid, + size=size, + exclude_users=[], + ) + results = client.search_log.search(criteria=search_log_request, bulk=False) + expected_sorts = [ + SortItem(field="createdAt", order=SortOrder.ASCENDING), + SortItem(field="entityGuidsAll", order=SortOrder.ASCENDING), + ] + _assert_search_log_results(results, expected_sorts, size, TOTAL_LOG_ENTRIES) + assert mock_logger.call_count < TOTAL_LOG_ENTRIES + assert ( + "Result size (%s) exceeds threshold (%s)." + in mock_logger.call_args_list[0][0][0] + ) + mock_logger.reset_mock() + + def test_search_log_default_sorting(client: AtlanClient, sl_glossary: AtlasGlossary): # Empty sorting request = SearchLogRequest.views_by_guid(guid=sl_glossary.guid, size=10, sort=[]) diff --git a/tests/unit/data/search_responses/search_log_search_paging.json b/tests/unit/data/search_responses/search_log_search_paging.json new file mode 100644 index 00000000..dc10214e --- /dev/null +++ b/tests/unit/data/search_responses/search_log_search_paging.json @@ -0,0 +1,42 @@ +{ + "approximateCount": 1, + "logs": [ + { + "userAgent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36", + "host": "fieldtec34.atlan.com", + "ipAddress": "10.159.1.190", + "userName": "aryaman", + "entityGuidsAll": ["2be6f58d-fd8a-44fe-a43e-f97b150cd1dd"], + "entityQFNamesAll": ["W7YoLhzkzLnaqWpLg2k5M@LwP6giikgVbDg1lDyiZzx"], + "entityGuidsAllowed": ["2be6f58d-fd8a-44fe-a43e-f97b150cd1dd"], + "entityQFNamesAllowed": ["W7YoLhzkzLnaqWpLg2k5M@LwP6giikgVbDg1lDyiZzx"], + "entityTypeNamesAll": ["AtlasGlossaryTerm"], + "entityTypeNamesAllowed": ["AtlasGlossaryTerm"], + "utmTags": ["project_webapp", "action_asset_viewed", "asset_type_atlasglossaryterm", "page_glossary", "ui_profile"], + "hasResult": true, + "resultsCount": 1, + "responseTime": 63, + "createdAt": 1733492035914, + "timestamp": 1733492035849, + "failed": false, + "request.dsl": { + "from": 0, + "size": 1, + "query": { + "bool": { + "filter": { + "bool": { + "must": {"term": {"__guid": "2be6f58d-fd8a-44fe-a43e-f97b150cd1dd"}}, + "must_not": [{"term": {"isPartial": "true"}}] + } + } + } + }, + "sort": [] + }, + "request.dslText": "{\"from\":0,\"size\":1,\"query\":{\"bool\":{\"filter\":{\"bool\":{\"must\":{\"term\":{\"__guid\":\"2be6f58d-fd8a-44fe-a43e-f97b150cd1dd\"}},\"must_not\":[{\"term\":{\"isPartial\":\"true\"}}]}}}},\"sort\":[]}", + "request.attributes": ["metricTimestampColumn", "termType", "glossaryType"], + "request.relationAttributes": ["outputs", "inputs"] + } + ] + } diff --git a/tests/unit/test_search_log_search.py b/tests/unit/test_search_log_search.py new file mode 100644 index 00000000..b2f116bd --- /dev/null +++ b/tests/unit/test_search_log_search.py @@ -0,0 +1,166 @@ +from datetime import datetime, timezone +from json import load +from pathlib import Path +from unittest.mock import Mock, patch + +import pytest + +from pyatlan.client.search_log import LOGGER, SearchLogClient +from pyatlan.errors import InvalidRequestError +from pyatlan.model.enums import SortOrder +from pyatlan.model.search import SortItem +from pyatlan.model.search_log import SearchLogRequest, SearchLogResults + +SEARCH_RESPONSES_DIR = Path(__file__).parent / "data" / "search_responses" +SEARCH_LOGS_JSON = "search_log_search_paging.json" + + +@pytest.fixture(autouse=True) +def set_env(monkeypatch): + monkeypatch.setenv("ATLAN_BASE_URL", "https://name.atlan.com") + monkeypatch.setenv("ATLAN_API_KEY", "abkj") + + +@pytest.fixture(scope="module") +def mock_api_caller(): + return Mock() + + +@pytest.fixture() +def search_logs_json(): + def load_json(filename): + with (SEARCH_RESPONSES_DIR / filename).open() as input_file: + return load(input_file) + + return load_json(SEARCH_LOGS_JSON) + + +def _assert_search_log_results(results, response_json, sorts, bulk=False): + for log in results: + assert log.user_name == response_json["logs"][0]["userName"] + assert log.user_agent == response_json["logs"][0]["userAgent"] + assert log.ip_address == response_json["logs"][0]["ipAddress"] + assert log.host == response_json["logs"][0]["host"] + expected_timestamp = datetime.fromtimestamp( + response_json["logs"][0]["timestamp"] / 1000, tz=timezone.utc + ) + assert log.timestamp == expected_timestamp + assert log.entity_guids_all == response_json["logs"][0]["entityGuidsAll"] + + assert results.count == response_json["approximateCount"] + assert results._bulk == bulk + assert results._criteria.dsl.sort == sorts + + +@patch.object(LOGGER, "debug") +def test_search_log_pagination(mock_logger, mock_api_caller, search_logs_json): + client = SearchLogClient(mock_api_caller) + mock_api_caller._call_api.side_effect = [search_logs_json, search_logs_json, {}] + + # Test default pagination + search_log_request = SearchLogRequest.views_by_guid( + guid="some-guid", + size=2, + exclude_users=["atlansupport"], + ) + + response = client.search(criteria=search_log_request, bulk=False) + expected_sorts = [ + SortItem(field="timestamp", order=SortOrder.ASCENDING), + SortItem(field="entityGuidsAll", order=SortOrder.ASCENDING), + ] + + _assert_search_log_results(response, search_logs_json, expected_sorts) + assert mock_api_caller._call_api.call_count == 3 + assert mock_logger.call_count == 0 + mock_api_caller.reset_mock() + + # Test bulk pagination + mock_api_caller._call_api.side_effect = [search_logs_json, search_logs_json, {}] + response = client.search(criteria=search_log_request, bulk=True) + expected_sorts = [ + SortItem(field="createdAt", order=SortOrder.ASCENDING), + SortItem(field="entityGuidsAll", order=SortOrder.ASCENDING), + ] + + _assert_search_log_results(response, search_logs_json, expected_sorts, bulk=True) + # The call count will be 2 because both + # log entries are processed in the first API call. + # In the second API call, self._log_entries + # becomes 0, which breaks the pagination. + # This differs from offset-based pagination + # where an additional API call is needed + # to verify if the results are empty + assert mock_api_caller._call_api.call_count == 2 + assert mock_logger.call_count == 1 + assert ( + "Search log bulk search option is enabled." + in mock_logger.call_args_list[0][0][0] + ) + mock_logger.reset_mock() + mock_api_caller.reset_mock() + + # Test automatic bulk search conversion when exceeding threshold + with patch.object(SearchLogResults, "_MASS_EXTRACT_THRESHOLD", -1): + mock_api_caller._call_api.side_effect = [ + # Extra call to re-fetch the first page + # results with updated timestamp sorting + search_logs_json, + search_logs_json, + search_logs_json, + {}, + ] + search_log_request = SearchLogRequest.views_by_guid( # + guid="some-guid", + size=1, + exclude_users=["atlansupport"], + ) + response = client.search(criteria=search_log_request) + _assert_search_log_results( + response, search_logs_json, expected_sorts, bulk=False + ) + assert mock_logger.call_count == 1 + assert mock_api_caller._call_api.call_count == 3 + assert ( + "Result size (%s) exceeds threshold (%s)" + in mock_logger.call_args_list[0][0][0] + ) + + # Test exception for bulk=False with user-defined sorting and results exceeding the threshold + search_log_request = SearchLogRequest.views_by_guid( + guid="some-guid", + size=1, + sort=[SortItem(field="some-sort1", order=SortOrder.ASCENDING)], + exclude_users=["atlansupport"], + ) + with pytest.raises( + InvalidRequestError, + match=( + "ATLAN-PYTHON-400-067 Unable to execute " + "search log bulk search with user-defined sorting options. " + "Suggestion: Please ensure that no sorting options are " + "included in your search log search request when performing a bulk search." + ), + ): + client.search(criteria=search_log_request, bulk=False) + + # Test exception for bulk=True with user-defined sorting + search_log_request = SearchLogRequest.views_by_guid( + guid="some-guid", + size=1, + sort=[SortItem(field="some-sort2", order=SortOrder.ASCENDING)], + exclude_users=["atlansupport"], + ) + with pytest.raises( + InvalidRequestError, + match=( + "ATLAN-PYTHON-400-067 Unable to execute " + "search log bulk search with user-defined sorting options. " + "Suggestion: Please ensure that no sorting options are " + "included in your search log search request when performing a bulk search." + ), + ): + client.search(criteria=search_log_request, bulk=True) + + mock_logger.reset_mock() + mock_api_caller.reset_mock()