Skip to content

Commit

Permalink
address review
Browse files Browse the repository at this point in the history
  • Loading branch information
relud committed Nov 6, 2024
1 parent 2704c10 commit 991c724
Show file tree
Hide file tree
Showing 7 changed files with 93 additions and 124 deletions.
4 changes: 1 addition & 3 deletions socorro/external/es/crashstorage.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,8 +250,6 @@ def build_document(src, crash_document, fields, all_keys):

elif storage_type == "boolean":
value = fix_boolean(value)
if value is None:
continue

for dest_key in get_destination_keys(field):
if dest_key in all_keys:
Expand Down Expand Up @@ -315,7 +313,7 @@ def build_supersearch(self):
def build_search(self, **kwargs):
"""Return new instance of elasticsearch_dsl's Search."""
with self.client() as conn:
return Search(using=conn, doc_type=self.get_doctype(), **kwargs)
return Search(using=conn, **kwargs)

@staticmethod
def get_source_key(field):
Expand Down
5 changes: 2 additions & 3 deletions socorro/external/es/super_search_fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ def keyword_field(
"is_exposed": True,
"is_returned": True,
"query_type": "string",
"storage_mapping": {"type": "keyword", "ignore_above": 10_000},
"storage_mapping": {"type": "keyword"},
}


Expand Down Expand Up @@ -1743,8 +1743,7 @@ def apply_schema_properties(fields, schema):
"namespace": "processed_crash",
"query_type": "string",
"storage_mapping": {
"fielddata": True, # FIXME(relud): this may be required in more fields?
"type": "text",
"type": "keyword",
},
},
"processor_notes": {
Expand Down
63 changes: 27 additions & 36 deletions socorro/external/es/supersearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,11 @@ def _format(aggregation):
return aggregation

for i, bucket in enumerate(aggregation["buckets"]):
if "key_as_string" in bucket:
if bucket["key"] in (True, False):
# Restore es 1.4 format for boolean terms as string
term = "T" if bucket["key"] else "F"
elif "key_as_string" in bucket:
term = bucket["key_as_string"]
if term in ("true", "false"):
# Restore es 1.4 format for boolean terms as string
term = term[:1].upper()
else:
term = bucket["key"]

Expand Down Expand Up @@ -270,7 +270,7 @@ def get(self, **kwargs):
operator_range = {">": "gt", "<": "lt", ">=": "gte", "<=": "lte"}

args = {}
filter_type = "term"
query_name = "term"
filter_value = None

if not param.operator:
Expand All @@ -279,70 +279,61 @@ def get(self, **kwargs):
# to match for analyzed and non-analyzed supersearch fields.
if len(param.value) == 1:
# Only one value, so we only do a single match
filter_type = "match"
args = Q({"match": {search_key: param.value[0]}}).to_dict()[
"match"
]
query_name = "match"
args = {search_key: param.value[0]}
else:
# Multiple values, so we do multiple matches wrapped in a bool
# query where at least one of them should match
filter_type = "bool"
args = Q(
{
"bool": {
"should": [
{"match": {search_key: param_value}}
for param_value in param.value
],
"minimum_should_match": 1,
}
}
).to_dict()["bool"]
query_name = "bool"
args = {
"should": [
{"match": {search_key: param_value}}
for param_value in param.value
],
"minimum_should_match": 1,
}
elif param.operator == "=":
# is exactly
if field_data["has_full_version"]:
search_key = f"{search_key}.full"
filter_value = param.value
elif param.operator in operator_range:
filter_type = "range"
query_name = "range"
filter_value = {operator_range[param.operator]: param.value}
elif param.operator == "__null__":
filter_type = "bool"
query_name = "bool"
args["must_not"] = [Q("exists", field=search_key)]
elif param.operator == "__true__":
filter_type = "term"
query_name = "term"
filter_value = True
elif param.operator == "@":
filter_type = "regexp"
query_name = "regexp"
if field_data["has_full_version"]:
search_key = f"{search_key}.full"
filter_value = param.value
elif param.operator in operator_wildcards:
filter_type = "wildcard"
query_name = "wildcard"

# Wildcard operations are better applied to a non-analyzed
# field (called "full") if there is one.
if field_data["has_full_version"]:
search_key = f"{search_key}.full"

q_args = {}
q_args[search_key] = (
operator_wildcards[param.operator] % param.value
)
query = Q("wildcard", **q_args)
args = query.to_dict()["wildcard"]
args = {
search_key: (operator_wildcards[param.operator] % param.value)
}

if filter_value is not None:
args[search_key] = filter_value

if args:
new_filter = Q(filter_type, **args)
new_filter = Q(query_name, **args)
if param.operator_not:
new_filter = ~new_filter

if sub_filters is None:
sub_filters = new_filter
elif filter_type == "range":
elif query_name == "range":
sub_filters &= new_filter
else:
sub_filters |= new_filter
Expand Down Expand Up @@ -569,8 +560,8 @@ def _get_histogram_agg(self, field, intervals):
# Setting min_doc_count makes ES return only non-empty buckets.
# https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-bucket-histogram-aggregation.html
kwargs = {"min_doc_count": 1}
# NOTE(krzepka) "The date_histogram aggregation’s interval parameter is no longer valid."
# https://www.elastic.co/guide/en/elasticsearch/reference/master/migrating-8.0.html
# NOTE(relud) We want to use "calendar_interval" for date_histogram and
# "interval" for everything else.
if histogram_type == "date_histogram":
kwargs["calendar_interval"] = intervals[field]
else:
Expand Down
2 changes: 1 addition & 1 deletion socorro/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ def get_crash_data(self, crash_id):

with self.conn() as conn:
search = Search(using=conn, index=index)
search = search.filter("term", **{"processed_crash.uuid": crash_id})
search = search.filter({"term": {"processed_crash.uuid": crash_id}})
results = search.execute().to_dict()

if results["hits"]["hits"]:
Expand Down
2 changes: 1 addition & 1 deletion socorro/tests/external/es/test_crashstorage.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def test_index_crash_indexable_keys(self, es_helper):
"another_invalid_key": "alpha",
"date_processed": date_to_string(utc_now()),
"uuid": "936ce666-ff3b-4c7a-9674-367fe2120408",
"dom_fission_enabled": False,
"dom_fission_enabled": "1",
}

crashstorage = self.build_crashstorage()
Expand Down
2 changes: 1 addition & 1 deletion socorro/tests/external/es/test_super_search_fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def test_get_mapping(self):
assert "os_name" in processed_crash
assert "platform" not in processed_crash

# Those fields have a `storage_mapping`.
# Test a field that has a storage_mapping.
assert processed_crash["release_channel"] == {
"type": "keyword",
}
Expand Down
139 changes: 60 additions & 79 deletions socorro/tests/external/es/test_supersearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import http.server
import threading
from contextlib import contextmanager
from copy import deepcopy

import pytest

Expand Down Expand Up @@ -1227,14 +1228,11 @@ def test_get_with_date_histogram(self, es_helper):
assert "histogram_date" in res["facets"]

def dt_to_midnight(date):
return date.replace(hour=0, minute=0, second=0, microsecond=0)
return f"{date.replace(hour=0, minute=0, second=0, microsecond=0):%FT%H:%M:%S.000Z}"

# NOTE(relud) this used to use .isoformat() but with elasticsearch 8 dates are
# returned with millisecond precision and indicate time zone with Z instead of
# +00:00, so we have to carefully format the dates here.
today_str = f"{dt_to_midnight(now):%FT%H:%M:%S.000Z}"
yesterday_str = f"{dt_to_midnight(yesterday):%FT%H:%M:%S.000Z}"
day_before_str = f"{dt_to_midnight(the_day_before):%FT%H:%M:%S.000Z}"
today_str = dt_to_midnight(now)
yesterday_str = dt_to_midnight(yesterday)
day_before_str = dt_to_midnight(the_day_before)

expected_terms = [
{
Expand Down Expand Up @@ -1784,7 +1782,27 @@ def test_get_with_bad_regex(self, es_helper):
with pytest.raises(BadArgumentError):
api.get(signature=r'@"')

def test_get_with_failing_shards(self):
def test_get_with_failing_shards(self, es_helper):
# Generate a shard failure response via bad regex, like test_get_with_bad_regex
# above. Modify this response to avoid BadArgumentError and test generic shard
# failure.
crashstorage = self.build_crashstorage()
api = SuperSearchWithFields(crashstorage=crashstorage)
now = utc_now()
es_helper.index_crash(
processed_crash={
"uuid": create_new_ooid(timestamp=now),
"signature": "test.dll",
"date_processed": now,
},
)
bad_regex_query = api.get(signature=r'@"', _return_query=True)
search = crashstorage.build_search(index=bad_regex_query["indices"])
search = search.filter(bad_regex_query["query"]["query"])
search = search.source(bad_regex_query["query"]["_source"])
bad_regex_results = search.execute().to_dict()

# Use a mock es server to return custom results
ip, port = "127.0.0.1", 9999
with settings.override(
**{
Expand All @@ -1796,33 +1814,16 @@ def test_get_with_failing_shards(self):
api = SuperSearchWithFields(crashstorage=crashstorage)

# Test with one failing shard.
es_results = {
"hits": {
"hits": [],
"total": {"value": 0, "relation": "eq"},
"max_score": None,
},
"timed_out": False,
"took": 194,
"_shards": {
"successful": 9,
"failed": 1,
"total": 10,
"failures": [
{
"status": 500,
"index": "fake_index",
"reason": {
"type": "fake_exception",
"reason": "foo bar gone bad",
},
"shard": 3,
}
],
},
}

with mock_es_server(ip, port, es_results):
mock_results = deepcopy(bad_regex_results)
mock_results["_shards"]["failures"][0]["index"] = "fake_index"
mock_results["_shards"]["failures"][0]["reason"]["reason"] = (
"foo bar gone bad"
)
mock_results["_shards"]["failures"][0]["reason"]["caused_by"]["type"] = (
"foo_bar_exception"
)

with mock_es_server(ip, port, mock_results):
res = api.get()

assert "errors" in res
Expand All @@ -1831,51 +1832,31 @@ def test_get_with_failing_shards(self):
assert res["errors"] == errors_exp

# Test with several failures.
es_results = {
"hits": {
"hits": [],
"total": {"value": 0, "relation": "eq"},
"max_score": None,
},
"timed_out": False,
"took": 194,
"_shards": {
"successful": 9,
"failed": 3,
"total": 10,
"failures": [
{
"status": 500,
"index": "fake_index",
"reason": {
"type": "fake_exception",
"reason": "foo bar gone bad",
},
"shard": 2,
},
{
"status": 500,
"index": "fake_index",
"reason": {
"type": "fake_exception",
"reason": "foo bar gone bad",
},
"shard": 3,
},
{
"status": 500,
"index": "other_index",
"reason": {
"type": "fake_exception",
"reason": "foo bar gone bad",
},
"shard": 1,
},
],
},
}
mock_results = deepcopy(bad_regex_results)
mock_results["_shards"]["failed"] += 1
mock_results["_shards"]["successful"] -= 1
mock_results["_shards"]["failures"][0]["index"] = "fake_index"
mock_results["_shards"]["failures"][0]["reason"]["reason"] = (
"foo bar gone bad"
)
mock_results["_shards"]["failures"][0]["reason"]["caused_by"]["type"] = (
"foo_bar_exception"
)
# add failure on different shard
mock_results["_shards"]["failures"].append(
deepcopy(mock_results["_shards"]["failures"][0])
)
mock_results["_shards"]["failures"][-1]["shard"] += 1
mock_results["_shards"]["failures"][-1]["shard"] %= mock_results["_shards"][
"total"
]
# add failure on different index
mock_results["_shards"]["failures"].append(
deepcopy(mock_results["_shards"]["failures"][0])
)
mock_results["_shards"]["failures"][-1]["index"] = "other_index"

with mock_es_server(ip, port, es_results):
with mock_es_server(ip, port, mock_results):
res = api.get()

assert "errors" in res
Expand Down

0 comments on commit 991c724

Please sign in to comment.