From a740c40df9beaa980843b143aae3b60df699b648 Mon Sep 17 00:00:00 2001 From: Arpad Borsos Date: Wed, 18 Sep 2024 11:21:25 +0200 Subject: [PATCH] Add metric for uploads per scheduled tasks The `Upload` task schedules multiple `UploadProcessor` tasks and another `UploadFinisher`. This batching is rather accidental because of locking rather than intentional. It might be good to know how uploads are being grouped into a single processor/finisher chain. --- services/bundle_analysis/report.py | 7 ++--- services/report/__init__.py | 12 +++----- services/report/report_processor.py | 5 +--- services/test_results.py | 17 +++-------- tasks/tests/unit/test_upload_task.py | 5 ---- tasks/upload.py | 44 ++++++++++++++++------------ 6 files changed, 37 insertions(+), 53 deletions(-) diff --git a/services/bundle_analysis/report.py b/services/bundle_analysis/report.py index 5a6ec1cfb..b664bb555 100644 --- a/services/bundle_analysis/report.py +++ b/services/bundle_analysis/report.py @@ -4,10 +4,7 @@ from typing import Any, Dict, Optional import sentry_sdk -from shared.bundle_analysis import ( - BundleAnalysisReport, - BundleAnalysisReportLoader, -) +from shared.bundle_analysis import BundleAnalysisReport, BundleAnalysisReportLoader from shared.bundle_analysis.models import AssetType, MetadataKey from shared.bundle_analysis.storage import get_bucket_name from shared.django_apps.bundle_analysis.models import CacheConfig @@ -99,7 +96,7 @@ def update_upload(self, carriedforward: Optional[bool] = False) -> None: class BundleAnalysisReportService(BaseReportService): def initialize_and_save_report( - self, commit: Commit, report_code: str = None + self, commit: Commit, report_code: str | None = None ) -> CommitReport: db_session = commit.get_db_session() diff --git a/services/report/__init__.py b/services/report/__init__.py index 951d4eeb3..db36a982b 100644 --- a/services/report/__init__.py +++ b/services/report/__init__.py @@ -109,7 +109,7 @@ def __init__(self, current_yaml: UserYaml): self.current_yaml = current_yaml def initialize_and_save_report( - self, commit: Commit, report_code: str = None + self, commit: Commit, report_code: str | None = None ) -> CommitReport: raise NotImplementedError() @@ -150,6 +150,7 @@ def create_report_upload( Upload """ db_session = commit_report.get_db_session() + name = normalized_arguments.get("name") upload = Upload( external_id=normalized_arguments.get("reportid"), build_code=normalized_arguments.get("build"), @@ -157,11 +158,7 @@ def create_report_upload( env=None, report_id=commit_report.id_, job_code=normalized_arguments.get("job"), - name=( - normalized_arguments.get("name")[:100] - if normalized_arguments.get("name") - else None - ), + name=(name[:100] if name else None), provider=normalized_arguments.get("service"), state="started", storage_path=normalized_arguments.get("url"), @@ -295,8 +292,7 @@ def create_report_upload( self, normalized_arguments: Mapping[str, str], commit_report: CommitReport ) -> Upload: upload = super().create_report_upload(normalized_arguments, commit_report) - flags = normalized_arguments.get("flags") - flags = flags.split(",") if flags else [] + flags = normalized_arguments.get("flags", "").split(",") self._attach_flags_to_upload(upload, flags) # Insert entry in user measurements table only diff --git a/services/report/report_processor.py b/services/report/report_processor.py index 6e0e4e0fd..b5a5d98aa 100644 --- a/services/report/report_processor.py +++ b/services/report/report_processor.py @@ -81,6 +81,7 @@ ) +@sentry_sdk.trace def report_type_matching( report: ParsedUploadedReportFile, first_line: str ) -> ( @@ -200,10 +201,6 @@ def process_report( continue processor_name = type(processor).__name__ - sentry_sdk.metrics.incr( - "services.report.report_processor.parser", - tags={"type": processor_name}, - ) RAW_REPORT_SIZE.labels(processor=processor_name).observe(report.size) with RAW_REPORT_PROCESSOR_RUNTIME_SECONDS.labels( processor=processor_name diff --git a/services/test_results.py b/services/test_results.py index ece326efb..2be35eaae 100644 --- a/services/test_results.py +++ b/services/test_results.py @@ -7,20 +7,12 @@ from sqlalchemy import desc from database.enums import ReportType -from database.models import ( - Commit, - CommitReport, - RepositoryFlag, - TestInstance, - Upload, -) +from database.models import Commit, CommitReport, RepositoryFlag, TestInstance, Upload from helpers.notifier import BaseNotifier from rollouts import FLAKY_SHADOW_MODE, FLAKY_TEST_DETECTION from services.license import requires_license from services.report import BaseReportService -from services.repository import ( - get_repo_provider_service, -) +from services.repository import get_repo_provider_service from services.urls import get_members_url, get_test_analytics_url from services.yaml import read_yaml_field @@ -33,7 +25,7 @@ def __init__(self, current_yaml: UserYaml): self.flag_dict = None def initialize_and_save_report( - self, commit: Commit, report_code: str = None + self, commit: Commit, report_code: str | None = None ) -> CommitReport: db_session = commit.get_db_session() current_report_row = ( @@ -63,8 +55,7 @@ def create_report_upload( self, normalized_arguments: Mapping[str, str], commit_report: CommitReport ) -> Upload: upload = super().create_report_upload(normalized_arguments, commit_report) - flags = normalized_arguments.get("flags") - flags = flags or [] + flags = normalized_arguments.get("flags", "").split(",") self._attach_flags_to_upload(upload, flags) return upload diff --git a/tasks/tests/unit/test_upload_task.py b/tasks/tests/unit/test_upload_task.py index 2c062ebe7..0def612fe 100644 --- a/tasks/tests/unit/test_upload_task.py +++ b/tasks/tests/unit/test_upload_task.py @@ -799,7 +799,6 @@ def test_upload_task_no_bot( assert commit.message == "" assert commit.parent_commit_id is None mocked_1.assert_called_with( - mocker.ANY, commit, UserYaml({"codecov": {"max_report_age": "764y ago"}}), [ @@ -855,7 +854,6 @@ def test_upload_task_bot_no_permissions( assert commit.message == "" assert commit.parent_commit_id is None mocked_1.assert_called_with( - mocker.ANY, commit, UserYaml({"codecov": {"max_report_age": "764y ago"}}), [ @@ -932,7 +930,6 @@ def test_upload_task_bot_unauthorized( .first() ) mocked_schedule_task.assert_called_with( - mocker.ANY, commit, UserYaml({"codecov": {"max_report_age": "764y ago"}}), [ @@ -1023,7 +1020,6 @@ def fail_if_try_to_create_upload(*args, **kwargs): assert commit.report is not None assert commit.report.details is not None mocked_schedule_task.assert_called_with( - mocker.ANY, commit, UserYaml({"codecov": {"max_report_age": "764y ago"}}), [ @@ -1157,7 +1153,6 @@ def test_schedule_task_with_one_task(self, dbsession, mocker): ) mock_checkpoints = MagicMock(name="checkpoints") result = UploadTask().schedule_task( - dbsession, commit, commit_yaml, argument_list, diff --git a/tasks/upload.py b/tasks/upload.py index 3b7c99392..bbdce14fe 100644 --- a/tasks/upload.py +++ b/tasks/upload.py @@ -17,6 +17,7 @@ from shared.django_apps.codecov_metrics.service.codecov_metrics import ( UserOnboardingMetricsService, ) +from shared.metrics import Histogram from shared.torngit.exceptions import TorngitClientError, TorngitRepoNotFoundError from shared.yaml import UserYaml from shared.yaml.user_yaml import OwnerContext @@ -38,7 +39,11 @@ from services.archive import ArchiveService from services.bundle_analysis.report import BundleAnalysisReportService from services.redis import download_archive_from_redis, get_redis_connection -from services.report import NotReadyToBuildReportYetError, ReportService +from services.report import ( + BaseReportService, + NotReadyToBuildReportYetError, + ReportService, +) from services.repository import ( create_webhook_on_provider, fetch_commit_yaml_and_possibly_store, @@ -60,6 +65,14 @@ CHUNK_SIZE = 3 +UPLOADS_PER_TASK_SCHEDULE = Histogram( + "worker_uploads_per_schedule", + "The number of individual uploads scheduled for processing", + ["report_type"], + buckets=[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 12, 15, 20, 25, 30, 40, 50], +) + + class UploadContext: """ Encapsulates the arguments passed to an upload task. This includes both the @@ -370,7 +383,7 @@ def run_impl( retry_countdown = 20 * 2**self.request.retries log.warning( "Retrying upload", - extra=upload_context.log_extra(countdown=int(retry_countdown)), + extra=upload_context.log_extra(countdown=retry_countdown), ) self.retry( max_retries=3, @@ -408,6 +421,7 @@ def run_impl_within_lock( repository = commit.repository repository.updatestamp = datetime.now() repository_service = None + was_updated, was_setup = False, False try: installation_name_to_use = get_installation_name_for_owner_for_task( @@ -442,6 +456,7 @@ def run_impl_within_lock( extra=upload_context.log_extra(), exc_info=True, ) + if repository_service: commit_yaml = fetch_commit_yaml_and_possibly_store( commit, repository_service @@ -459,6 +474,7 @@ def run_impl_within_lock( owner_context=context, ) + report_service: BaseReportService if report_type == ReportType.COVERAGE: # TODO: consider renaming class to `CoverageReportService` report_service = ReportService( @@ -474,8 +490,7 @@ def run_impl_within_lock( try: log.info("Initializing and saving report", extra=upload_context.log_extra()) commit_report = report_service.initialize_and_save_report( - commit, - upload_context.report_code, + commit, upload_context.report_code ) except NotReadyToBuildReportYetError: log.warning( @@ -503,8 +518,11 @@ def run_impl_within_lock( if argument_list: db_session.commit() + + UPLOADS_PER_TASK_SCHEDULE.labels(report_type=report_type.value).observe( + len(argument_list) + ) scheduled_tasks = self.schedule_task( - db_session, commit, commit_yaml, argument_list, @@ -530,11 +548,11 @@ def run_impl_within_lock( "Not scheduling task because there were no arguments found on redis", extra=upload_context.log_extra(), ) + return {"was_setup": was_setup, "was_updated": was_updated} def schedule_task( self, - db_session: Session, commit: Commit, commit_yaml: UserYaml, argument_list: list[dict], @@ -557,20 +575,12 @@ def schedule_task( ) assert checkpoints return self._schedule_coverage_processing_task( - db_session, - commit, - commit_yaml, - argument_list, - commit_report, - upload_context, - checkpoints, + commit, commit_yaml, argument_list, commit_report, checkpoints ) elif upload_context.report_type == ReportType.BUNDLE_ANALYSIS: assert commit_report.report_type == ReportType.BUNDLE_ANALYSIS.value return self._schedule_bundle_analysis_processing_task( - commit, - commit_yaml, - argument_list, + commit, commit_yaml, argument_list ) elif upload_context.report_type == ReportType.TEST_RESULTS: assert commit_report.report_type == ReportType.TEST_RESULTS.value @@ -581,12 +591,10 @@ def schedule_task( def _schedule_coverage_processing_task( self, - db_session: Session, commit: Commit, commit_yaml: dict, argument_list: list[dict], commit_report: CommitReport, - upload_context: UploadContext, checkpoints: CheckpointLogger, ): checkpoints.log(UploadFlow.INITIAL_PROCESSING_COMPLETE)