diff --git a/tasks/test_results_processor.py b/tasks/test_results_processor.py index abc6b5f52..1d90ad89f 100644 --- a/tasks/test_results_processor.py +++ b/tasks/test_results_processor.py @@ -4,10 +4,8 @@ import zlib from datetime import date, datetime from io import BytesIO -from sys import getsizeof from typing import List -import sentry_sdk from shared.celery_config import test_results_processor_task_name from shared.config import get_config from shared.yaml import UserYaml @@ -23,7 +21,15 @@ ) from app import celery_app -from database.models import DailyTestRollup, Repository, Test, TestInstance, Upload +from database.models import ( + DailyTestRollup, + Flake, + Repository, + Test, + TestInstance, + Upload, +) +from helpers.metrics import metrics from services.archive import ArchiveService from services.test_results import generate_flags_hash, generate_test_id from services.yaml import read_yaml_field @@ -68,8 +74,21 @@ def run_impl( testrun_dict_list = [] upload_list = [] + f = db_session.query(Flake).all() + + flakes = ( + db_session.query(Flake) + .filter(Flake.repoid == repoid, Flake.end_date.is_(None)) + .all() + ) + + flaky_test_set = set() + + for flake in flakes: + flaky_test_set.add(flake.testid) + # process each report session's test information - with sentry_sdk.metrics.timing("test_results.processor"): + with metrics.timer("test_results.processor"): for args in arguments_list: upload_obj: Upload = ( db_session.query(Upload) @@ -78,7 +97,7 @@ def run_impl( ) res = self.process_individual_upload( - db_session, repoid, commitid, upload_obj + db_session, repoid, commitid, upload_obj, flaky_test_set ) # concat existing and new test information @@ -107,167 +126,145 @@ def _bulk_write_tests_to_db( branch: str, parsed_testruns: List[Testrun], flags_hash: str, + flaky_test_set: set[str], ): - memory_used = getsizeof(parsed_testruns) // 1024 - with sentry_sdk.metrics.timing(key="test_results.processor.write_to_db"): - test_data = [] - test_instance_data = [] - daily_totals = dict() - for testrun in parsed_testruns: - # Build up the data for bulk insert - name = testrun.name - testsuite = testrun.testsuite - outcome = str(testrun.outcome) - duration_seconds = testrun.duration - failure_message = testrun.failure_message - test_id = generate_test_id(repoid, testsuite, name, flags_hash) - - test_data.append( - dict( - id=test_id, - repoid=repoid, - name=name, - testsuite=testsuite, - flags_hash=flags_hash, - ) + test_data = [] + test_instance_data = [] + daily_totals = dict() + for testrun in parsed_testruns: + # Build up the data for bulk insert + name = testrun.name + testsuite = testrun.testsuite + outcome = str(testrun.outcome) + duration_seconds = testrun.duration + failure_message = testrun.failure_message + test_id = generate_test_id(repoid, testsuite, name, flags_hash) + + test_data.append( + dict( + id=test_id, + repoid=repoid, + name=name, + testsuite=testsuite, + flags_hash=flags_hash, ) + ) - test_instance_data.append( - dict( - test_id=test_id, - upload_id=upload_id, - duration_seconds=duration_seconds, - outcome=outcome, - failure_message=failure_message, - commitid=commitid, - branch=branch, - reduced_error_id=None, - repoid=repoid, - ) + test_instance_data.append( + dict( + test_id=test_id, + upload_id=upload_id, + duration_seconds=duration_seconds, + outcome=outcome, + failure_message=failure_message, + commitid=commitid, + branch=branch, + reduced_error_id=None, + repoid=repoid, ) + ) - def update_daily_total(): - daily_totals[test_id]["last_duration_seconds"] = duration_seconds - daily_totals[test_id]["avg_duration_seconds"] = ( - daily_totals[test_id]["avg_duration_seconds"] - * ( - daily_totals[test_id]["pass_count"] - + daily_totals[test_id]["fail_count"] - ) - + duration_seconds - ) / ( + def update_daily_total(): + daily_totals[test_id]["last_duration_seconds"] = duration_seconds + daily_totals[test_id]["avg_duration_seconds"] = ( + daily_totals[test_id]["avg_duration_seconds"] + * ( daily_totals[test_id]["pass_count"] + daily_totals[test_id]["fail_count"] - + 1 ) + + duration_seconds + ) / ( + daily_totals[test_id]["pass_count"] + + daily_totals[test_id]["fail_count"] + + 1 + ) - if outcome == str(Outcome.Pass): - daily_totals[test_id]["pass_count"] += 1 - elif outcome == str(Outcome.Failure) or outcome == str( - Outcome.Error - ): - daily_totals[test_id]["fail_count"] += 1 - elif outcome == str(Outcome.Skip): - daily_totals[test_id]["skip_count"] += 1 - - def create_daily_total(): - daily_totals[test_id] = { - "test_id": test_id, - "repoid": repoid, - "last_duration_seconds": duration_seconds, - "avg_duration_seconds": duration_seconds, - "pass_count": 1 if outcome == str(Outcome.Pass) else 0, - "fail_count": 1 - if outcome == str(Outcome.Failure) - or outcome == str(Outcome.Error) - else 0, - "skip_count": 1 if outcome == str(Outcome.Skip) else 0, - "branch": branch, - "date": date.today(), - "latest_run": datetime.now(), - "commits_where_fail": [commitid] - if ( - outcome == str(Outcome.Failure) - or outcome == str(Outcome.Error) - ) - else [], - } - - if outcome != str(Outcome.Skip): - if test_id in daily_totals: - update_daily_total() - else: - create_daily_total() - - # Save Tests - insert_on_conflict_do_nothing = ( - insert(Test.__table__).values(test_data).on_conflict_do_nothing() - ) - db_session.execute(insert_on_conflict_do_nothing) - db_session.flush() - - # Upsert Daily Test Totals - stmt = insert(DailyTestRollup.__table__).values(list(daily_totals.values())) - stmt = stmt.on_conflict_do_update( - index_elements=[ - "repoid", - "branch", - "test_id", - "date", - ], - set_={ - "last_duration_seconds": stmt.excluded.last_duration_seconds, - "avg_duration_seconds": ( - DailyTestRollup.__table__.c.avg_duration_seconds - * ( - DailyTestRollup.__table__.c.pass_count - + DailyTestRollup.__table__.c.fail_count - ) - + stmt.excluded.avg_duration_seconds + if outcome == str(Outcome.Pass): + daily_totals[test_id]["pass_count"] += 1 + elif outcome == str(Outcome.Failure) or outcome == str(Outcome.Error): + daily_totals[test_id]["fail_count"] += 1 + elif outcome == str(Outcome.Skip): + daily_totals[test_id]["skip_count"] += 1 + + def create_daily_total(): + daily_totals[test_id] = { + "test_id": test_id, + "repoid": repoid, + "last_duration_seconds": duration_seconds, + "avg_duration_seconds": duration_seconds, + "pass_count": 1 if outcome == str(Outcome.Pass) else 0, + "fail_count": 1 + if outcome == str(Outcome.Failure) or outcome == str(Outcome.Error) + else 0, + "skip_count": 1 if outcome == str(Outcome.Skip) else 0, + "flaky_fail_count": 1 if test_id in flaky_test_set else 0, + "branch": branch, + "date": date.today(), + "latest_run": datetime.now(), + "commits_where_fail": [commitid] + if ( + outcome == str(Outcome.Failure) or outcome == str(Outcome.Error) ) - / ( - DailyTestRollup.__table__.c.pass_count - + DailyTestRollup.__table__.c.fail_count - + 1 - ), - "latest_run": stmt.excluded.latest_run, - "pass_count": DailyTestRollup.__table__.c.pass_count - + stmt.excluded.pass_count, - "skip_count": DailyTestRollup.__table__.c.skip_count - + stmt.excluded.skip_count, - "fail_count": DailyTestRollup.__table__.c.fail_count - + stmt.excluded.fail_count, - "commits_where_fail": DailyTestRollup.__table__.c.commits_where_fail - + stmt.excluded.commits_where_fail, - }, - ) + else [], + } + + if outcome != str(Outcome.Skip): + if test_id in daily_totals: + update_daily_total() + else: + create_daily_total() + + # Save Tests + insert_on_conflict_do_nothing = ( + insert(Test.__table__).values(test_data).on_conflict_do_nothing() + ) + db_session.execute(insert_on_conflict_do_nothing) + db_session.flush() + + # Upsert Daily Test Totals + rollup_table = DailyTestRollup.__table__ + stmt = insert(rollup_table).values(list(daily_totals.values())) + stmt = stmt.on_conflict_do_update( + index_elements=[ + "repoid", + "branch", + "test_id", + "date", + ], + set_={ + "last_duration_seconds": stmt.excluded.last_duration_seconds, + "avg_duration_seconds": ( + rollup_table.c.avg_duration_seconds + * (rollup_table.c.pass_count + rollup_table.c.fail_count) + + stmt.excluded.avg_duration_seconds + ) + / (rollup_table.c.pass_count + rollup_table.c.fail_count + 1), + "latest_run": stmt.excluded.latest_run, + "pass_count": rollup_table.c.pass_count + stmt.excluded.pass_count, + "skip_count": rollup_table.c.skip_count + stmt.excluded.skip_count, + "fail_count": rollup_table.c.fail_count + stmt.excluded.fail_count, + "flaky_fail_count": rollup_table.c.flaky_fail_count + + stmt.excluded.flaky_fail_count, + "commits_where_fail": rollup_table.c.commits_where_fail + + stmt.excluded.commits_where_fail, + }, + ) - db_session.execute(stmt) - db_session.flush() + db_session.execute(stmt) + db_session.flush() - # Save TestInstances - insert_test_instances = insert(TestInstance.__table__).values( - test_instance_data - ) - db_session.execute(insert_test_instances) - db_session.flush() - # Memory outside the time metrics to not disturb the counter - # Obviously this is a very rough estimate of sizes. We are interested more - # in the difference between the insert approaches. SO this should be fine. - # And these aux memory structures take the bulk of extra memory we need - memory_used += getsizeof(test_data) // 1024 - memory_used += getsizeof(test_instance_data) // 1024 - sentry_sdk.metrics.gauge( - key="test_results.processor.write_to_db.aux_memory_used", - value=memory_used, - unit="kilobytes", + # Save TestInstances + insert_test_instances = insert(TestInstance.__table__).values( + test_instance_data ) + db_session.execute(insert_test_instances) + db_session.flush() def process_individual_upload( - self, db_session, repoid, commitid, upload_obj: Upload + self, db_session, repoid, commitid, upload_obj: Upload, flaky_test_set: set[str] ): upload_id = upload_obj.id - with sentry_sdk.metrics.timing("test_results.processor.process_individual_arg"): + with metrics.timer("test_results.processor.process_individual_arg"): parsed_testruns: List[Testrun] = self.process_individual_arg( upload_obj, upload_obj.report.commit.repository ) @@ -287,7 +284,14 @@ def process_individual_upload( upload_id = upload_obj.id branch = upload_obj.report.commit.branch self._bulk_write_tests_to_db( - db_session, repoid, commitid, upload_id, branch, parsed_testruns, flags_hash + db_session, + repoid, + commitid, + upload_id, + branch, + parsed_testruns, + flags_hash, + flaky_test_set, ) return { @@ -329,12 +333,11 @@ def parse_single_file( file_bytes: BytesIO, ): try: - with sentry_sdk.metrics.timing("test_results.processor.parser_matching"): + with metrics.timer("test_results.processor.parser_matching"): parser, parsing_function = self.match_report(filename, file_bytes) except ParserNotSupportedError as e: - sentry_sdk.metrics.incr( - "test_results.processor.parsing", - tags={"status": "failure", "reason": "match_report_failure"}, + metrics.incr( + "test_results.processor.parsing.failure.match_report_failure", ) raise ParserFailureError( err_msg="File did not match any parser format", @@ -343,15 +346,14 @@ def parse_single_file( try: file_content = file_bytes.read() - with sentry_sdk.metrics.timing("test_results.processor.file_parsing"): + with metrics.timer("test_results.processor.file_parsing"): res = parsing_function(file_content) except ParserError as e: # aware of cardinality issues with using a variable here in the reason field but # parser is defined by us and limited to the amount of different parsers we will # write, so I don't expect this to be a problem for us - sentry_sdk.metrics.incr( - "test_results.processor.parsing", - tags={"status": "failure", "reason": f"failed_to_parse_{parser}"}, + metrics.incr( + "test_results.processor.parsing.failure.failed_to_parse", ) raise ParserFailureError( err_msg="Error parsing file", @@ -359,9 +361,8 @@ def parse_single_file( parser=parser, parser_err_msg=str(e), ) from e - sentry_sdk.metrics.incr( - "test_results.processor.parsing", - tags={"status": "success", "parser": parser}, + metrics.incr( + "test_results.processor.parsing.success", ) return res diff --git a/tasks/tests/unit/test_test_results_processor_task.py b/tasks/tests/unit/test_test_results_processor_task.py index e10f91c44..c6d3e0736 100644 --- a/tasks/tests/unit/test_test_results_processor_task.py +++ b/tasks/tests/unit/test_test_results_processor_task.py @@ -9,6 +9,7 @@ from database.models import CommitReport from database.models.reports import DailyTestRollup, Test, TestInstance from database.tests.factories import CommitFactory, UploadFactory +from database.tests.factories.reports import FlakeFactory from services.test_results import generate_test_id from tasks.test_results_processor import ( ParserError, @@ -722,6 +723,11 @@ def test_upload_processor_task_call_daily_test_totals( dbsession.add(upload) dbsession.flush() + first_test = dbsession.query(Test).first() + flake = FlakeFactory.create(test=first_test) + dbsession.add(flake) + dbsession.flush() + redis_queue = [{"url": second_url, "upload_pk": upload.id_}] result = TestResultsProcessorTask().run_impl( @@ -758,6 +764,7 @@ def test_upload_processor_task_call_daily_test_totals( assert [r.fail_count for r in rollups] == [1, 0, 0, 1] assert [r.pass_count for r in rollups] == [1, 1, 2, 0] assert [r.skip_count for r in rollups] == [0, 0, 0, 0] + assert [r.flaky_fail_count for r in rollups] == [0, 0, 1, 0] assert [r.commits_where_fail for r in rollups] == [ ["cd76b0821854a780b60012aed85af0a8263004ad"], @@ -774,5 +781,4 @@ def test_upload_processor_task_call_daily_test_totals( ] assert [r.avg_duration_seconds for r in rollups] == [0.001, 7.2, 0.002, 3.6] assert [r.last_duration_seconds for r in rollups] == [0.001, 7.2, 0.002, 3.6] - traveller.stop()