Skip to content

Commit

Permalink
feat(capman): Instead of flatly rejecting heavy projects, limit their…
Browse files Browse the repository at this point in the history
… bytes scanned on the query level (#6929)

Sometimes large customers complain that they get throttled because
someone scans all their data and then they're locked out for 10 minutes.
Not a great experience

To ease the pain, still allow their queries to go through but give them
a short leash using clickhouses `max_bytes_to_read` setting


By default this will not be turned on until a human is around to monitor
it
  • Loading branch information
volokluev authored Mar 4, 2025
1 parent b0b0eb5 commit ba04959
Show file tree
Hide file tree
Showing 7 changed files with 317 additions and 43 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/image.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ jobs:
publish-to-dockerhub:
needs: build
name: Publish Snuba to DockerHub
runs-on: ubuntu-20.04
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4 # v3.1.0
- name: Pull the test image
Expand Down
11 changes: 6 additions & 5 deletions snuba/query/allocation_policies/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,20 +110,21 @@ class QuotaAllowance:
quota_unit: str
suggestion: str

# sets this value:
# https://clickhouse.com/docs/operations/settings/settings#max_bytes_to_read
# 0 means unlimited
max_bytes_to_read: int = field(default=0)

def to_dict(self) -> dict[str, Any]:
return asdict(self)

def __lt__(self, other: QuotaAllowance) -> bool:
if self.can_run and not other.can_run:
return False
return self.max_threads < other.max_threads

def __eq__(self, other: Any) -> bool:
if not isinstance(other, QuotaAllowance):
return False
return (
self.can_run == other.can_run
and self.max_threads == other.max_threads
and self.max_bytes_to_read == other.max_bytes_to_read
and self.explanation == other.explanation
and self.is_throttled == other.is_throttled
and self.throttle_threshold == other.throttle_threshold
Expand Down
104 changes: 78 additions & 26 deletions snuba/query/allocation_policies/bytes_scanned_rejecting_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,18 @@ def _additional_config_definitions(self) -> list[AllocationPolicyConfig]:
int,
DEFAULT_THREADS_THROTTLE_DIVIDER,
),
AllocationPolicyConfig(
"limit_bytes_instead_of_rejecting",
"instead of rejecting a query, limit its bytes with max_bytes_to_read on clickhouse",
int,
0,
),
AllocationPolicyConfig(
"max_bytes_to_read_scan_limit_divider",
"if limit_bytes_instead_of_rejecting is set and the scan limit is reached, scan_limit/ max_bytes_to_read_scan_limit_divider is how many bytes each query will be capped to",
float,
1.0,
),
]

def _are_tenant_ids_valid(
Expand Down Expand Up @@ -230,32 +242,72 @@ def _get_quota_allowance(
granted_quota = granted_quotas[0]
used_quota = scan_limit - granted_quota.granted
if granted_quota.granted <= 0:
explanation[
"reason"
] = f"""{customer_tenant_key} {customer_tenant_value} is over the bytes scanned limit of {scan_limit} for referrer {referrer}.
This policy is exceeded when a customer is abusing a specific feature in a way that puts load on clickhouse. If this is happening to
"many customers, that may mean the feature is written in an inefficient way"""
explanation["granted_quota"] = granted_quota.granted
explanation["limit"] = scan_limit
# This is technically a high cardinality tag value however these rejections
# should not happen often therefore it should be safe to output these rejections as metris
self.metrics.increment(
"bytes_scanned_rejection",
tags={
"tenant": f"{customer_tenant_key}__{customer_tenant_value}__{referrer}"
},
)
return QuotaAllowance(
can_run=False,
max_threads=0,
explanation=explanation,
is_throttled=True,
throttle_threshold=throttle_threshold,
rejection_threshold=scan_limit,
quota_used=used_quota,
quota_unit=QUOTA_UNIT,
suggestion=SUGGESTION,
)
if self.get_config_value("limit_bytes_instead_of_rejecting"):
max_bytes_to_read = int(
scan_limit
/ self.get_config_value("max_bytes_to_read_scan_limit_divider")
)
explanation[
"reason"
] = f"""{customer_tenant_key} {customer_tenant_value} is over the bytes scanned limit of {scan_limit} for referrer {referrer}.
The query will be limited to {max_bytes_to_read} bytes
"""
explanation["granted_quota"] = granted_quota.granted
explanation["limit"] = scan_limit
# This is technically a high cardinality tag value however these rejections
# should not happen often therefore it should be safe to output these rejections as metris

self.metrics.increment(
"bytes_scanned_limited",
tags={
"tenant": f"{customer_tenant_key}__{customer_tenant_value}__{referrer}"
},
)
return QuotaAllowance(
can_run=True,
max_threads=max(
1,
self.max_threads
// self.get_config_value("threads_throttle_divider"),
),
max_bytes_to_read=max_bytes_to_read,
explanation=explanation,
is_throttled=True,
throttle_threshold=throttle_threshold,
rejection_threshold=scan_limit,
quota_used=used_quota,
quota_unit=QUOTA_UNIT,
suggestion=SUGGESTION,
)

else:
explanation[
"reason"
] = f"""{customer_tenant_key} {customer_tenant_value} is over the bytes scanned limit of {scan_limit} for referrer {referrer}.
This policy is exceeded when a customer is abusing a specific feature in a way that puts load on clickhouse. If this is happening to
"many customers, that may mean the feature is written in an inefficient way"""
explanation["granted_quota"] = granted_quota.granted
explanation["limit"] = scan_limit
# This is technically a high cardinality tag value however these rejections
# should not happen often therefore it should be safe to output these rejections as metris

self.metrics.increment(
"bytes_scanned_rejection",
tags={
"tenant": f"{customer_tenant_key}__{customer_tenant_value}__{referrer}"
},
)
return QuotaAllowance(
can_run=False,
max_threads=0,
explanation=explanation,
is_throttled=True,
throttle_threshold=throttle_threshold,
rejection_threshold=scan_limit,
quota_used=used_quota,
quota_unit=QUOTA_UNIT,
suggestion=SUGGESTION,
)

# this checks to see if you reached the throttle threshold
if granted_quota.granted < scan_limit - throttle_threshold:
Expand Down
22 changes: 20 additions & 2 deletions snuba/web/db_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -479,12 +479,18 @@ def _raw_query(
trigger_rate_limiter = None
status = None
request_status = get_request_status(cause)

calculated_cause = cause
if isinstance(cause, RateLimitExceeded):
status = QueryStatus.RATE_LIMITED
trigger_rate_limiter = cause.extra_data.get("scope", "")
elif isinstance(cause, ClickhouseError):
error_code = cause.code
status = get_query_status_from_error_codes(error_code)
if error_code == ErrorCodes.TOO_MANY_BYTES:
calculated_cause = RateLimitExceeded(
"Query scanned more than the allocated amount of bytes"
)

with configure_scope() as scope:
fingerprint = ["{{default}}", str(cause.code), dataset_name]
Expand Down Expand Up @@ -519,7 +525,7 @@ def _raw_query(
"sql": sql,
"experiments": clickhouse_query.get_experiments(),
},
) from cause
) from calculated_cause
else:
stats = update_with_status(
status=QueryStatus.SUCCESS,
Expand Down Expand Up @@ -849,11 +855,23 @@ def _apply_allocation_policies_quota(
key: quota_allowance.to_dict()
for key, quota_allowance in quota_allowances.items()
}

stats["quota_allowance"] = {}
stats["quota_allowance"]["details"] = allowance_dicts

summary: dict[str, Any] = {}
summary["threads_used"] = min_threads_across_policies

max_bytes_to_read = min(
[qa.max_bytes_to_read for qa in quota_allowances.values()],
key=lambda mb: float("inf") if mb == 0 else mb,
)
if max_bytes_to_read != 0:
query_settings.push_clickhouse_setting(
"max_bytes_to_read", max_bytes_to_read
)
summary["max_bytes_to_read"] = max_bytes_to_read

_populate_query_status(
summary, rejection_quota_and_policy, throttle_quota_and_policy
)
Expand All @@ -878,6 +896,6 @@ def _apply_allocation_policies_quota(
"successful_query",
tags={"storage_key": allocation_policies[0].storage_key.value},
)
max_threads = min(quota_allowances.values()).max_threads
max_threads = min_threads_across_policies
span.set_data("max_threads", max_threads)
query_settings.set_resource_quota(ResourceQuota(max_threads=max_threads))
Original file line number Diff line number Diff line change
Expand Up @@ -293,3 +293,48 @@ def test_does_not_throttle_and_then_throttles(
allowance = policy.get_quota_allowance(tenant_ids, QUERY_ID)
assert allowance.is_throttled == True
assert allowance.max_threads == MAX_THREAD_NUMBER // 2


@pytest.mark.redis_db
def test_limit_bytes_read(
policy: BytesScannedRejectingPolicy,
) -> None:
tenant_ids = {
"project_id": 4505240668733440,
"referrer": "api.trace-explorer.stats",
}
scan_limit = 20000000000
threads_throttle_divider = 2
max_bytes_to_read_scan_limit_divider = 100
policy.set_config_value("threads_throttle_divider", threads_throttle_divider)
policy.set_config_value("bytes_throttle_divider", 100)
policy.set_config_value("limit_bytes_instead_of_rejecting", 1)
policy.set_config_value(
"max_bytes_to_read_scan_limit_divider", max_bytes_to_read_scan_limit_divider
)

policy.set_config_value(
"referrer_all_projects_scan_limit_override",
scan_limit,
{"referrer": "api.trace-explorer.stats"},
)

_configure_policy(policy)
policy.update_quota_balance(
tenant_ids,
QUERY_ID,
QueryResultOrError(
query_result=QueryResult(
result={"profile": {"progress_bytes": scan_limit}},
extra={"stats": {}, "sql": "", "experiments": {}},
),
error=None,
),
)

allowance = policy.get_quota_allowance(tenant_ids, QUERY_ID)
assert allowance.is_throttled
assert allowance.max_threads == MAX_THREAD_NUMBER // threads_throttle_divider
assert allowance.max_bytes_to_read == int(
scan_limit / max_bytes_to_read_scan_limit_divider
)
77 changes: 68 additions & 9 deletions tests/test_snql_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,35 @@
from tests.helpers import override_entity_column_validator, write_unprocessed_events


class MaxBytesPolicy123(AllocationPolicy):
def _additional_config_definitions(self) -> list[AllocationPolicyConfig]:
return []

def _get_quota_allowance(
self, tenant_ids: dict[str, str | int], query_id: str
) -> QuotaAllowance:
return QuotaAllowance(
can_run=True,
max_threads=0,
max_bytes_to_read=1,
explanation={},
is_throttled=True,
throttle_threshold=MAX_THRESHOLD,
rejection_threshold=MAX_THRESHOLD,
quota_used=0,
quota_unit=NO_UNITS,
suggestion=NO_SUGGESTION,
)

def _update_quota_balance(
self,
tenant_ids: dict[str, str | int],
query_id: str,
result_or_error: QueryResultOrError,
) -> None:
return


class RejectAllocationPolicy123(AllocationPolicy):
def _additional_config_definitions(self) -> list[AllocationPolicyConfig]:
return []
Expand Down Expand Up @@ -1306,6 +1335,39 @@ def test_timeseries_processor_join_query(self) -> None:
in data["sql"]
)

def test_allocation_policy_max_bytes_to_read(self) -> None:
with patch(
"snuba.web.db_query._get_allocation_policies",
return_value=[
MaxBytesPolicy123(StorageKey("doesntmatter"), ["a", "b", "c"], {})
],
):
response = self.post(
"/discover/snql",
data=json.dumps(
{
"query": f"""MATCH (discover_events )
SELECT count() AS count BY project_id, tags[custom_tag]
WHERE type != 'transaction' AND project_id = {self.project_id}
AND timestamp >= toDateTime('{self.base_time.isoformat()}')
AND timestamp < toDateTime('{self.next_time.isoformat()}')
ORDER BY count ASC
LIMIT 1000""",
"referrer": "myreferrer",
"turbo": False,
"consistent": True,
"debug": True,
"tenant_ids": {"referrer": "r", "organization_id": 123},
}
),
)
assert response.status_code == 429

assert (
response.json["error"]["message"]
== "Query scanned more than the allocated amount of bytes"
)

def test_allocation_policy_violation(self) -> None:
with patch(
"snuba.web.db_query._get_allocation_policies",
Expand Down Expand Up @@ -1342,12 +1404,13 @@ def test_allocation_policy_violation(self) -> None:
"storage_key": "StorageKey.DOESNTMATTER",
},
"is_throttled": False,
"throttle_threshold": MAX_THRESHOLD,
"rejection_threshold": MAX_THRESHOLD,
"throttle_threshold": 1000000000000,
"rejection_threshold": 1000000000000,
"quota_used": 0,
"quota_unit": NO_UNITS,
"suggestion": NO_SUGGESTION,
},
"quota_unit": "no_units",
"suggestion": "no_suggestion",
"max_bytes_to_read": 0,
}
},
"summary": {
"threads_used": 0,
Expand All @@ -1367,10 +1430,6 @@ def test_allocation_policy_violation(self) -> None:
"throttled_by": {},
},
}

print("info")
print(info)

assert (
response.json["error"]["message"]
== f"Query on could not be run due to allocation policies, info: {info}"
Expand Down
Loading

0 comments on commit ba04959

Please sign in to comment.