diff --git a/.github/workflows/image.yml b/.github/workflows/image.yml index d75c03555a0..cf1ea2569e0 100644 --- a/.github/workflows/image.yml +++ b/.github/workflows/image.yml @@ -55,7 +55,7 @@ jobs: publish-to-dockerhub: needs: build name: Publish Snuba to DockerHub - runs-on: ubuntu-20.04 + runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 # v3.1.0 - name: Pull the test image diff --git a/snuba/query/allocation_policies/__init__.py b/snuba/query/allocation_policies/__init__.py index c4c27d0dbd5..505535801fb 100644 --- a/snuba/query/allocation_policies/__init__.py +++ b/snuba/query/allocation_policies/__init__.py @@ -110,20 +110,21 @@ class QuotaAllowance: quota_unit: str suggestion: str + # sets this value: + # https://clickhouse.com/docs/operations/settings/settings#max_bytes_to_read + # 0 means unlimited + max_bytes_to_read: int = field(default=0) + def to_dict(self) -> dict[str, Any]: return asdict(self) - def __lt__(self, other: QuotaAllowance) -> bool: - if self.can_run and not other.can_run: - return False - return self.max_threads < other.max_threads - def __eq__(self, other: Any) -> bool: if not isinstance(other, QuotaAllowance): return False return ( self.can_run == other.can_run and self.max_threads == other.max_threads + and self.max_bytes_to_read == other.max_bytes_to_read and self.explanation == other.explanation and self.is_throttled == other.is_throttled and self.throttle_threshold == other.throttle_threshold diff --git a/snuba/query/allocation_policies/bytes_scanned_rejecting_policy.py b/snuba/query/allocation_policies/bytes_scanned_rejecting_policy.py index 8475c9bfc7c..4ce6d08c4dc 100644 --- a/snuba/query/allocation_policies/bytes_scanned_rejecting_policy.py +++ b/snuba/query/allocation_policies/bytes_scanned_rejecting_policy.py @@ -111,6 +111,18 @@ def _additional_config_definitions(self) -> list[AllocationPolicyConfig]: int, DEFAULT_THREADS_THROTTLE_DIVIDER, ), + AllocationPolicyConfig( + "limit_bytes_instead_of_rejecting", + "instead of rejecting a query, limit its bytes with max_bytes_to_read on clickhouse", + int, + 0, + ), + AllocationPolicyConfig( + "max_bytes_to_read_scan_limit_divider", + "if limit_bytes_instead_of_rejecting is set and the scan limit is reached, scan_limit/ max_bytes_to_read_scan_limit_divider is how many bytes each query will be capped to", + float, + 1.0, + ), ] def _are_tenant_ids_valid( @@ -230,32 +242,72 @@ def _get_quota_allowance( granted_quota = granted_quotas[0] used_quota = scan_limit - granted_quota.granted if granted_quota.granted <= 0: - explanation[ - "reason" - ] = f"""{customer_tenant_key} {customer_tenant_value} is over the bytes scanned limit of {scan_limit} for referrer {referrer}. - This policy is exceeded when a customer is abusing a specific feature in a way that puts load on clickhouse. If this is happening to - "many customers, that may mean the feature is written in an inefficient way""" - explanation["granted_quota"] = granted_quota.granted - explanation["limit"] = scan_limit - # This is technically a high cardinality tag value however these rejections - # should not happen often therefore it should be safe to output these rejections as metris - self.metrics.increment( - "bytes_scanned_rejection", - tags={ - "tenant": f"{customer_tenant_key}__{customer_tenant_value}__{referrer}" - }, - ) - return QuotaAllowance( - can_run=False, - max_threads=0, - explanation=explanation, - is_throttled=True, - throttle_threshold=throttle_threshold, - rejection_threshold=scan_limit, - quota_used=used_quota, - quota_unit=QUOTA_UNIT, - suggestion=SUGGESTION, - ) + if self.get_config_value("limit_bytes_instead_of_rejecting"): + max_bytes_to_read = int( + scan_limit + / self.get_config_value("max_bytes_to_read_scan_limit_divider") + ) + explanation[ + "reason" + ] = f"""{customer_tenant_key} {customer_tenant_value} is over the bytes scanned limit of {scan_limit} for referrer {referrer}. + The query will be limited to {max_bytes_to_read} bytes + """ + explanation["granted_quota"] = granted_quota.granted + explanation["limit"] = scan_limit + # This is technically a high cardinality tag value however these rejections + # should not happen often therefore it should be safe to output these rejections as metris + + self.metrics.increment( + "bytes_scanned_limited", + tags={ + "tenant": f"{customer_tenant_key}__{customer_tenant_value}__{referrer}" + }, + ) + return QuotaAllowance( + can_run=True, + max_threads=max( + 1, + self.max_threads + // self.get_config_value("threads_throttle_divider"), + ), + max_bytes_to_read=max_bytes_to_read, + explanation=explanation, + is_throttled=True, + throttle_threshold=throttle_threshold, + rejection_threshold=scan_limit, + quota_used=used_quota, + quota_unit=QUOTA_UNIT, + suggestion=SUGGESTION, + ) + + else: + explanation[ + "reason" + ] = f"""{customer_tenant_key} {customer_tenant_value} is over the bytes scanned limit of {scan_limit} for referrer {referrer}. + This policy is exceeded when a customer is abusing a specific feature in a way that puts load on clickhouse. If this is happening to + "many customers, that may mean the feature is written in an inefficient way""" + explanation["granted_quota"] = granted_quota.granted + explanation["limit"] = scan_limit + # This is technically a high cardinality tag value however these rejections + # should not happen often therefore it should be safe to output these rejections as metris + + self.metrics.increment( + "bytes_scanned_rejection", + tags={ + "tenant": f"{customer_tenant_key}__{customer_tenant_value}__{referrer}" + }, + ) + return QuotaAllowance( + can_run=False, + max_threads=0, + explanation=explanation, + is_throttled=True, + throttle_threshold=throttle_threshold, + rejection_threshold=scan_limit, + quota_used=used_quota, + quota_unit=QUOTA_UNIT, + suggestion=SUGGESTION, + ) # this checks to see if you reached the throttle threshold if granted_quota.granted < scan_limit - throttle_threshold: diff --git a/snuba/web/db_query.py b/snuba/web/db_query.py index 45a28af5899..757529cb233 100644 --- a/snuba/web/db_query.py +++ b/snuba/web/db_query.py @@ -479,12 +479,18 @@ def _raw_query( trigger_rate_limiter = None status = None request_status = get_request_status(cause) + + calculated_cause = cause if isinstance(cause, RateLimitExceeded): status = QueryStatus.RATE_LIMITED trigger_rate_limiter = cause.extra_data.get("scope", "") elif isinstance(cause, ClickhouseError): error_code = cause.code status = get_query_status_from_error_codes(error_code) + if error_code == ErrorCodes.TOO_MANY_BYTES: + calculated_cause = RateLimitExceeded( + "Query scanned more than the allocated amount of bytes" + ) with configure_scope() as scope: fingerprint = ["{{default}}", str(cause.code), dataset_name] @@ -519,7 +525,7 @@ def _raw_query( "sql": sql, "experiments": clickhouse_query.get_experiments(), }, - ) from cause + ) from calculated_cause else: stats = update_with_status( status=QueryStatus.SUCCESS, @@ -849,11 +855,23 @@ def _apply_allocation_policies_quota( key: quota_allowance.to_dict() for key, quota_allowance in quota_allowances.items() } + stats["quota_allowance"] = {} stats["quota_allowance"]["details"] = allowance_dicts summary: dict[str, Any] = {} summary["threads_used"] = min_threads_across_policies + + max_bytes_to_read = min( + [qa.max_bytes_to_read for qa in quota_allowances.values()], + key=lambda mb: float("inf") if mb == 0 else mb, + ) + if max_bytes_to_read != 0: + query_settings.push_clickhouse_setting( + "max_bytes_to_read", max_bytes_to_read + ) + summary["max_bytes_to_read"] = max_bytes_to_read + _populate_query_status( summary, rejection_quota_and_policy, throttle_quota_and_policy ) @@ -878,6 +896,6 @@ def _apply_allocation_policies_quota( "successful_query", tags={"storage_key": allocation_policies[0].storage_key.value}, ) - max_threads = min(quota_allowances.values()).max_threads + max_threads = min_threads_across_policies span.set_data("max_threads", max_threads) query_settings.set_resource_quota(ResourceQuota(max_threads=max_threads)) diff --git a/tests/query/allocation_policies/test_bytes_scanned_rejecting_policy.py b/tests/query/allocation_policies/test_bytes_scanned_rejecting_policy.py index 2712a00d97f..cf32f1d7729 100644 --- a/tests/query/allocation_policies/test_bytes_scanned_rejecting_policy.py +++ b/tests/query/allocation_policies/test_bytes_scanned_rejecting_policy.py @@ -293,3 +293,48 @@ def test_does_not_throttle_and_then_throttles( allowance = policy.get_quota_allowance(tenant_ids, QUERY_ID) assert allowance.is_throttled == True assert allowance.max_threads == MAX_THREAD_NUMBER // 2 + + +@pytest.mark.redis_db +def test_limit_bytes_read( + policy: BytesScannedRejectingPolicy, +) -> None: + tenant_ids = { + "project_id": 4505240668733440, + "referrer": "api.trace-explorer.stats", + } + scan_limit = 20000000000 + threads_throttle_divider = 2 + max_bytes_to_read_scan_limit_divider = 100 + policy.set_config_value("threads_throttle_divider", threads_throttle_divider) + policy.set_config_value("bytes_throttle_divider", 100) + policy.set_config_value("limit_bytes_instead_of_rejecting", 1) + policy.set_config_value( + "max_bytes_to_read_scan_limit_divider", max_bytes_to_read_scan_limit_divider + ) + + policy.set_config_value( + "referrer_all_projects_scan_limit_override", + scan_limit, + {"referrer": "api.trace-explorer.stats"}, + ) + + _configure_policy(policy) + policy.update_quota_balance( + tenant_ids, + QUERY_ID, + QueryResultOrError( + query_result=QueryResult( + result={"profile": {"progress_bytes": scan_limit}}, + extra={"stats": {}, "sql": "", "experiments": {}}, + ), + error=None, + ), + ) + + allowance = policy.get_quota_allowance(tenant_ids, QUERY_ID) + assert allowance.is_throttled + assert allowance.max_threads == MAX_THREAD_NUMBER // threads_throttle_divider + assert allowance.max_bytes_to_read == int( + scan_limit / max_bytes_to_read_scan_limit_divider + ) diff --git a/tests/test_snql_api.py b/tests/test_snql_api.py index c895b91da76..567333bab87 100644 --- a/tests/test_snql_api.py +++ b/tests/test_snql_api.py @@ -32,6 +32,35 @@ from tests.helpers import override_entity_column_validator, write_unprocessed_events +class MaxBytesPolicy123(AllocationPolicy): + def _additional_config_definitions(self) -> list[AllocationPolicyConfig]: + return [] + + def _get_quota_allowance( + self, tenant_ids: dict[str, str | int], query_id: str + ) -> QuotaAllowance: + return QuotaAllowance( + can_run=True, + max_threads=0, + max_bytes_to_read=1, + explanation={}, + is_throttled=True, + throttle_threshold=MAX_THRESHOLD, + rejection_threshold=MAX_THRESHOLD, + quota_used=0, + quota_unit=NO_UNITS, + suggestion=NO_SUGGESTION, + ) + + def _update_quota_balance( + self, + tenant_ids: dict[str, str | int], + query_id: str, + result_or_error: QueryResultOrError, + ) -> None: + return + + class RejectAllocationPolicy123(AllocationPolicy): def _additional_config_definitions(self) -> list[AllocationPolicyConfig]: return [] @@ -1306,6 +1335,39 @@ def test_timeseries_processor_join_query(self) -> None: in data["sql"] ) + def test_allocation_policy_max_bytes_to_read(self) -> None: + with patch( + "snuba.web.db_query._get_allocation_policies", + return_value=[ + MaxBytesPolicy123(StorageKey("doesntmatter"), ["a", "b", "c"], {}) + ], + ): + response = self.post( + "/discover/snql", + data=json.dumps( + { + "query": f"""MATCH (discover_events ) + SELECT count() AS count BY project_id, tags[custom_tag] + WHERE type != 'transaction' AND project_id = {self.project_id} + AND timestamp >= toDateTime('{self.base_time.isoformat()}') + AND timestamp < toDateTime('{self.next_time.isoformat()}') + ORDER BY count ASC + LIMIT 1000""", + "referrer": "myreferrer", + "turbo": False, + "consistent": True, + "debug": True, + "tenant_ids": {"referrer": "r", "organization_id": 123}, + } + ), + ) + assert response.status_code == 429 + + assert ( + response.json["error"]["message"] + == "Query scanned more than the allocated amount of bytes" + ) + def test_allocation_policy_violation(self) -> None: with patch( "snuba.web.db_query._get_allocation_policies", @@ -1342,12 +1404,13 @@ def test_allocation_policy_violation(self) -> None: "storage_key": "StorageKey.DOESNTMATTER", }, "is_throttled": False, - "throttle_threshold": MAX_THRESHOLD, - "rejection_threshold": MAX_THRESHOLD, + "throttle_threshold": 1000000000000, + "rejection_threshold": 1000000000000, "quota_used": 0, - "quota_unit": NO_UNITS, - "suggestion": NO_SUGGESTION, - }, + "quota_unit": "no_units", + "suggestion": "no_suggestion", + "max_bytes_to_read": 0, + } }, "summary": { "threads_used": 0, @@ -1367,10 +1430,6 @@ def test_allocation_policy_violation(self) -> None: "throttled_by": {}, }, } - - print("info") - print(info) - assert ( response.json["error"]["message"] == f"Query on could not be run due to allocation policies, info: {info}" diff --git a/tests/web/test_db_query.py b/tests/web/test_db_query.py index e7d87b91422..a7aa6d03b46 100644 --- a/tests/web/test_db_query.py +++ b/tests/web/test_db_query.py @@ -318,6 +318,7 @@ def test_db_query_success() -> None: "ReferrerGuardRailPolicy": { "can_run": True, "max_threads": 10, + "max_bytes_to_read": 0, "explanation": { "reason": "within limit", "policy": "referrer_guard_rail_policy", @@ -334,6 +335,7 @@ def test_db_query_success() -> None: "ConcurrentRateLimitAllocationPolicy": { "can_run": True, "max_threads": 10, + "max_bytes_to_read": 0, "explanation": { "reason": "within limit", "overrides": {}, @@ -349,6 +351,7 @@ def test_db_query_success() -> None: "BytesScannedRejectingPolicy": { "can_run": True, "max_threads": 10, + "max_bytes_to_read": 0, "explanation": { "reason": "within_limit", "storage_key": "StorageKey.ERRORS_RO", @@ -362,6 +365,7 @@ def test_db_query_success() -> None: }, "CrossOrgQueryAllocationPolicy": { "can_run": True, + "max_bytes_to_read": 0, "max_threads": 10, "explanation": { "reason": "pass_through", @@ -544,6 +548,7 @@ def __init__(self, max_threads: int, policy_name: str) -> None: "ThrottleAllocationPolicy1": { "can_run": True, "max_threads": 1, + "max_bytes_to_read": 0, "explanation": { "reason": "ThrottleAllocationPolicy1 throttles all queries", "storage_key": "StorageKey.DOESNTMATTER", @@ -558,6 +563,7 @@ def __init__(self, max_threads: int, policy_name: str) -> None: "ThrottleAllocationPolicy2": { "can_run": True, "max_threads": 2, + "max_bytes_to_read": 0, "explanation": { "reason": "ThrottleAllocationPolicy2 throttles all queries", "storage_key": "StorageKey.DOESNTMATTER", @@ -676,6 +682,7 @@ def _update_quota_balance( "details": { "RejectAllocationPolicy": { "can_run": False, + "max_bytes_to_read": 0, "explanation": { "reason": "policy rejects all queries", "storage_key": "StorageKey.DOESNTMATTER", @@ -922,6 +929,7 @@ def _run_query() -> None: "storage_key": "StorageKey.DOESNTMATTER", }, "is_throttled": False, + "max_bytes_to_read": 0, "throttle_threshold": MAX_QUERIES_TO_RUN, "rejection_threshold": MAX_QUERIES_TO_RUN, "quota_used": queries_run, @@ -1061,3 +1069,94 @@ def test_cache_metrics_with_simple_readthrough() -> None: mock.call.increment("cache_hit_simple", tags={"dataset": "events"}), ] ) + + +@pytest.mark.clickhouse_db +@pytest.mark.redis_db +def test_policy_sets_max_bytes_to_read() -> None: + class MaxBytesPolicy(AllocationPolicy): + def _additional_config_definitions(self) -> list[AllocationPolicyConfig]: + return [] + + def _get_quota_allowance( + self, tenant_ids: dict[str, str | int], query_id: str + ) -> QuotaAllowance: + return QuotaAllowance( + can_run=True, + max_threads=10, + explanation={}, + is_throttled=True, + throttle_threshold=420, + rejection_threshold=42069, + quota_used=123, + quota_unit="concurrent_queries", + suggestion=NO_SUGGESTION, + max_bytes_to_read=1, + ) + + def _update_quota_balance( + self, + tenant_ids: dict[str, str | int], + query_id: str, + result_or_error: QueryResultOrError, + ) -> None: + pass + + query, storage, attribution_info = _build_test_query( + "count(distinct(project_id))", + [ + MaxBytesPolicy(StorageKey("doesntmatter"), ["a", "b", "c"], {}), + ], + ) + + query_metadata_list: list[ClickhouseQueryMetadata] = [] + stats: dict[str, Any] = {} + settings = HTTPQuerySettings() + db_query( + clickhouse_query=query, + query_settings=settings, + attribution_info=attribution_info, + dataset_name="events", + query_metadata_list=query_metadata_list, + formatted_query=format_query(query), + reader=storage.get_cluster().get_reader(), + timer=Timer("foo"), + stats=stats, + trace_id="trace_id", + robust=False, + ) + + assert stats["quota_allowance"] == { + "details": { + "MaxBytesPolicy": { + "can_run": True, + "explanation": {"storage_key": "StorageKey.DOESNTMATTER"}, + "is_throttled": True, + "max_bytes_to_read": 1, + "max_threads": 10, + "quota_unit": "concurrent_queries", + "quota_used": 123, + "rejection_threshold": 42069, + "suggestion": "no_suggestion", + "throttle_threshold": 420, + } + }, + "summary": { + "is_rejected": False, + "is_successful": False, + "is_throttled": True, + "max_bytes_to_read": 1, + "rejected_by": {}, + "rejection_storage_key": None, + "threads_used": 10, + "throttle_storage_key": "StorageKey.DOESNTMATTER", + "throttled_by": { + "policy": "MaxBytesPolicy", + "quota_unit": "concurrent_queries", + "quota_used": 123, + "storage_key": "StorageKey.DOESNTMATTER", + "suggestion": "no_suggestion", + "throttle_threshold": 420, + }, + }, + }