From f1ab4435c10fe1f838b826a49f8de9b22f258caf Mon Sep 17 00:00:00 2001 From: Arpad Borsos Date: Thu, 29 Aug 2024 15:32:19 +0200 Subject: [PATCH] Implement fully parallel upload processing This adds another feature/rollout flag which prefers the parallel upload processing pipeline in favor of running it as an experiment. Upload Processing can run in essentially 4 modes: - Completely serial processing - Serial processing, but running "experiment" code (`is_experiment_serial`): - In this mode, each `UploadProcessor` task saves a copy of the raw upload, as well as a copy of the final report (`is_final`) for later verification. - Parallel processing, but running "experiment" code (`is_experiment_parallel`): - In this mode, another parallel set of `UploadProcessor` tasks runs *after* the main set up tasks. - These tasks are using the copied-over raw uploads that were prepared by the `is_experiment_serial` tasks to do their processing. - These tasks are not persisting any of their results in the database, instead the final `UploadFinisher` task will launch the `ParallelVerification` task. - Fully parallel processing (`is_fully_parallel`): - In this mode, the final `UploadFinisher` task is responsible for merging the final report and persisting it. An example Task chain might look like this, in "experiment" mode: - Upload - UploadProcessor (`is_experiment_serial`) - UploadProcessor (`is_experiment_serial`) - UploadProcessor (`is_experiment_serial`, `is_final`) - UploadFinisher - UploadProcessor (`is_experiment_parallel`) - UploadProcessor (`is_experiment_parallel`) - UploadProcessor (`is_experiment_parallel`) - UploadFinisher (`is_experiment_parallel`) - ParallelVerification Once implemented, `is_fully_parallel` will look like this: - Upload - UploadProcessor (`is_fully_parallel`) - UploadProcessor (`is_fully_parallel`) - UploadProcessor (`is_fully_parallel`) - UploadFinisher (`is_fully_parallel`) --- helpers/parallel.py | 100 +++++++++++ helpers/parallel_upload_processing.py | 6 +- rollouts/__init__.py | 1 + services/report/__init__.py | 74 ++++----- tasks/tests/integration/test_upload_e2e.py | 21 ++- .../tests/unit/test_upload_processing_task.py | 6 +- tasks/tests/unit/test_upload_task.py | 15 +- tasks/upload.py | 63 ++++--- tasks/upload_finisher.py | 155 ++++++++++-------- tasks/upload_processor.py | 93 ++++++----- 10 files changed, 341 insertions(+), 193 deletions(-) create mode 100644 helpers/parallel.py diff --git a/helpers/parallel.py b/helpers/parallel.py new file mode 100644 index 000000000..09220b463 --- /dev/null +++ b/helpers/parallel.py @@ -0,0 +1,100 @@ +import dataclasses +from typing import Self + +from rollouts import ( + FULLY_PARALLEL_UPLOAD_PROCESSING_BY_REPO, + PARALLEL_UPLOAD_PROCESSING_BY_REPO, +) + + +@dataclasses.dataclass(frozen=True) +class ParallelProcessing: + """ + This encapsulates Parallel Upload Processing logic + + Upload Processing can run in essentially 4 modes: + - Completely serial processing + - Serial processing, but running "experiment" code (`is_experiment_serial`): + - In this mode, each `UploadProcessor` task saves a copy of the raw upload, + as well as a copy of the final report (`is_final`) for later verification. + - Parallel processing, but running "experiment" code (`is_experiment_parallel`): + - In this mode, another parallel set of `UploadProcessor` tasks runs *after* + the main set up tasks. + - These tasks are using the copied-over raw uploads that were prepared by + the `is_experiment_serial` tasks to do their processing. + - These tasks are not persisting any of their results in the database, + instead the final `UploadFinisher` task will launch the `ParallelVerification` task. + - Fully parallel processing (`is_fully_parallel`): + - In this mode, the final `UploadFinisher` task is responsible for merging + the final report and persisting it. + + An example Task chain might look like this, in "experiment" mode: + - Upload + - UploadProcessor (`is_experiment_serial`) + - UploadProcessor (`is_experiment_serial`) + - UploadProcessor (`is_experiment_serial`, `is_final`) + - UploadFinisher + - UploadProcessor (`is_experiment_parallel`) + - UploadProcessor (`is_experiment_parallel`) + - UploadProcessor (`is_experiment_parallel`) + - UploadFinisher (`is_experiment_parallel`) + - ParallelVerification + + The `is_fully_parallel` mode looks like this: + - Upload + - UploadProcessor (`is_fully_parallel`) + - UploadProcessor (`is_fully_parallel`) + - UploadProcessor (`is_fully_parallel`) + - UploadFinisher (`is_fully_parallel`) + """ + + run_experiment: bool = False + run_fully_parallel: bool = False + + is_fully_parallel: bool = False + is_experiment_parallel: bool = False + is_experiment_serial: bool = False + is_final: bool = False + parallel_idx: int | None = None + + def initial(repoid: int) -> Self: + run_fully_parallel = FULLY_PARALLEL_UPLOAD_PROCESSING_BY_REPO.check_value( + identifier=repoid, default=False + ) + run_experiment = ( + False + if run_fully_parallel + else PARALLEL_UPLOAD_PROCESSING_BY_REPO.check_value( + identifier=repoid, default=False + ) + ) + + return ParallelProcessing( + run_fully_parallel=run_fully_parallel, + run_experiment=run_experiment, + is_fully_parallel=run_fully_parallel, + ) + + def from_task_args( + repoid: int, + in_parallel: bool = False, + fully_parallel: bool = False, + is_final: bool = False, + parallel_idx: bool | None = None, + **kwargs, + ) -> Self: + slf = ParallelProcessing.initial(repoid) + + if fully_parallel: + return dataclasses.replace(slf, is_fully_parallel=True) + + is_experiment_parallel = slf.run_experiment and in_parallel + is_experiment_serial = slf.run_experiment and not in_parallel + + return dataclasses.replace( + slf, + is_experiment_parallel=is_experiment_parallel, + is_experiment_serial=is_experiment_serial, + is_final=is_final, + parallel_idx=parallel_idx, + ) diff --git a/helpers/parallel_upload_processing.py b/helpers/parallel_upload_processing.py index 75a94ca4e..0edf753bd 100644 --- a/helpers/parallel_upload_processing.py +++ b/helpers/parallel_upload_processing.py @@ -43,16 +43,14 @@ def _adjust_sessions( def get_parallel_session_ids( sessions, argument_list, db_session, report_service, commit_yaml ): - num_sessions = len(argument_list) - mock_sessions = copy.deepcopy(sessions) # the sessions already in the report get_parallel_session_ids = [] # iterate over all uploads, get the next session id, and adjust sessions (remove CFF logic) - for i in range(num_sessions): + for arguments in argument_list: next_session_id = next_session_number(mock_sessions) - upload_pk = argument_list[i]["upload_pk"] + upload_pk = arguments["upload_pk"] upload = db_session.query(Upload).filter_by(id_=upload_pk).first() to_merge_session = report_service.build_session(upload) flags = upload.flag_names diff --git a/rollouts/__init__.py b/rollouts/__init__.py index 67003e32f..2e28e8a6a 100644 --- a/rollouts/__init__.py +++ b/rollouts/__init__.py @@ -11,6 +11,7 @@ ) PARALLEL_UPLOAD_PROCESSING_BY_REPO = Feature("parallel_upload_processing") +FULLY_PARALLEL_UPLOAD_PROCESSING_BY_REPO = Feature("fully_parallel_upload_processing") CARRYFORWARD_BASE_SEARCH_RANGE_BY_OWNER = Feature("carryforward_base_search_range") diff --git a/services/report/__init__.py b/services/report/__init__.py index 7e8747595..64254f90b 100644 --- a/services/report/__init__.py +++ b/services/report/__init__.py @@ -43,11 +43,9 @@ RepositoryWithoutValidBotError, ) from helpers.labels import get_labels_per_session +from helpers.parallel import ParallelProcessing from helpers.telemetry import MetricContext -from rollouts import ( - CARRYFORWARD_BASE_SEARCH_RANGE_BY_OWNER, - PARALLEL_UPLOAD_PROCESSING_BY_REPO, -) +from rollouts import CARRYFORWARD_BASE_SEARCH_RANGE_BY_OWNER from services.archive import ArchiveService from services.redis import ( PARALLEL_UPLOAD_PROCESSING_SESSION_COUNTER_TTL, @@ -61,9 +59,7 @@ RAW_UPLOAD_RAW_REPORT_COUNT, RAW_UPLOAD_SIZE, ) -from services.report.raw_upload_processor import ( - process_raw_upload, -) +from services.report.raw_upload_processor import process_raw_upload from services.repository import get_repo_provider_service from services.yaml.reader import get_paths_from_flags, read_yaml_field @@ -207,7 +203,9 @@ def has_initialized_report(self, commit: Commit) -> bool: @sentry_sdk.trace def initialize_and_save_report( - self, commit: Commit, report_code: str = None + self, + commit: Commit, + report_code: str = None, ) -> CommitReport: """ Initializes the commit report @@ -287,26 +285,28 @@ def initialize_and_save_report( # This means there is a report to carryforward self.save_full_report(commit, report, report_code) + parallel_processing = ParallelProcessing.initial( + commit.repository.repoid + ) # Behind parallel processing flag, save the CFF report to GCS so the parallel variant of # finisher can build off of it later. Makes the assumption that the CFFs occupy the first # j to i session ids where i is the max id of the CFFs and j is some integer less than i. - if PARALLEL_UPLOAD_PROCESSING_BY_REPO.check_value( - identifier=commit.repository.repoid - ): + if parallel_processing.run_experiment: self.save_parallel_report_to_archive(commit, report, report_code) - highest_session_id = max( - report.sessions.keys() - ) # the largest id among the CFFs - get_redis_connection().incrby( - name=get_parallel_upload_processing_session_counter_redis_key( + # the largest id among the CFFs: + highest_session_id = max(report.sessions.keys()) + redis = get_redis_connection() + redis_key = ( + get_parallel_upload_processing_session_counter_redis_key( commit.repository.repoid, commit.commitid - ), + ) + ) + redis.incrby( + name=redis_key, amount=highest_session_id + 1, ) - get_redis_connection().expire( - name=get_parallel_upload_processing_session_counter_redis_key( - commit.repository.repoid, commit.commitid - ), + redis.expire( + name=redis_key, time=PARALLEL_UPLOAD_PROCESSING_SESSION_COUNTER_TTL, ) @@ -840,7 +840,7 @@ def create_new_report_for_commit(self, commit: Commit) -> Report: @sentry_sdk.trace def parse_raw_report_from_storage( - self, repo: Repository, upload: Upload, is_parallel=False + self, repo: Repository, upload: Upload, parallel_processing: ParallelProcessing ) -> ParsedRawReport: """Pulls the raw uploaded report from storage and parses it so it's easier to access different parts of the raw upload. @@ -851,23 +851,19 @@ def parse_raw_report_from_storage( archive_service = self.get_archive_service(repo) archive_url = upload.storage_path - # TODO: For the parallel experiment, can remove once finished log.info( "Parsing the raw report from storage", extra=dict( commit=upload.report.commit_id, repoid=repo.repoid, archive_url=archive_url, - is_parallel=is_parallel, ), ) # For the parallel upload verification experiment, we need to make a copy of the raw uploaded reports # so that the parallel pipeline can use those to parse. The serial pipeline rewrites the raw uploaded # reports to a human readable version that doesn't include file fixes, so that's why copying is necessary. - if PARALLEL_UPLOAD_PROCESSING_BY_REPO.check_value( - identifier=repo.repoid, default=False - ): + if parallel_processing.run_experiment: parallel_url = archive_url.removesuffix(".txt") + "_PARALLEL.txt" log.info( "In the parallel experiment for parsing raw report in storage", @@ -878,7 +874,7 @@ def parse_raw_report_from_storage( archive_url=archive_url, ), ) - if not is_parallel: + if parallel_processing.is_experiment_serial: archive_file = archive_service.read_file(archive_url) archive_service.write_file(parallel_url, archive_file) log.info( @@ -929,7 +925,7 @@ def build_report_from_raw_content( report: Report, raw_report_info: RawReportInfo, upload: Upload, - parallel_idx=None, + parallel_processing: ParallelProcessing, ) -> ProcessingResult: """ Processes an upload on top of an existing report `master` and returns @@ -965,7 +961,7 @@ def build_report_from_raw_content( try: raw_report = self.parse_raw_report_from_storage( - commit.repository, upload, is_parallel=parallel_idx is not None + commit.repository, upload, parallel_processing ) raw_report_info.raw_report = raw_report except FileNotInStorageError: @@ -977,7 +973,7 @@ def build_report_from_raw_content( reportid=reportid, commit_yaml=self.current_yaml.to_dict(), archive_url=archive_url, - in_parallel=parallel_idx is not None, + parallel_processing=parallel_processing, ), ) result.error = ProcessingError( @@ -997,12 +993,17 @@ def build_report_from_raw_content( flags, session, upload=upload, - parallel_idx=parallel_idx, + parallel_idx=parallel_processing.parallel_idx, ) result.report = process_result.report log.info( "Successfully processed report" - + (" (in parallel)" if parallel_idx is not None else ""), + + ( + " (in parallel)" + if parallel_processing.is_experiment_parallel + or parallel_processing.is_fully_parallel + else "" + ), extra=dict( session=session.id, ci=f"{session.provider}:{session.build}:{session.job}", @@ -1049,13 +1050,6 @@ def update_upload_with_processing_result( db_session = upload_obj.get_db_session() session = processing_result.session if processing_result.error is None: - # this should be enabled for the actual rollout of parallel upload processing. - # if PARALLEL_UPLOAD_PROCESSING_BY_REPO.check_value( - # "this should be the repo id" - # ): - # upload_obj.state_id = UploadState.PARALLEL_PROCESSED.db_id - # upload_obj.state = "parallel_processed" - # else: upload_obj.state_id = UploadState.PROCESSED.db_id upload_obj.state = "processed" upload_obj.order_number = session.id diff --git a/tasks/tests/integration/test_upload_e2e.py b/tasks/tests/integration/test_upload_e2e.py index 359437631..6fa7a407d 100644 --- a/tasks/tests/integration/test_upload_e2e.py +++ b/tasks/tests/integration/test_upload_e2e.py @@ -13,7 +13,10 @@ from database.models.core import Commit, CompareCommit, Repository from database.tests.factories import CommitFactory, RepositoryFactory from database.tests.factories.core import PullFactory -from rollouts import PARALLEL_UPLOAD_PROCESSING_BY_REPO +from rollouts import ( + FULLY_PARALLEL_UPLOAD_PROCESSING_BY_REPO, + PARALLEL_UPLOAD_PROCESSING_BY_REPO, +) from services.archive import ArchiveService from services.redis import get_redis_connection from services.report import ReportService @@ -114,9 +117,18 @@ def setup_mock_get_compare( @pytest.mark.integration @pytest.mark.django_db() -@pytest.mark.parametrize("do_parallel_processing", [False, True]) +@pytest.mark.parametrize( + "do_fully_parallel_processing,do_parallel_processing", + [ + (False, False), + (False, True), + (True, True), + ], + ids=["fully synchronous", "parallel experiment", "fully parallel"], +) def test_full_upload( dbsession: Session, + do_fully_parallel_processing: bool, do_parallel_processing: bool, mocker, mock_repo_provider, @@ -146,6 +158,11 @@ def test_full_upload( } ) # use parallel processing: + mocker.patch.object( + FULLY_PARALLEL_UPLOAD_PROCESSING_BY_REPO, + "check_value", + return_value=do_fully_parallel_processing, + ) mocker.patch.object( PARALLEL_UPLOAD_PROCESSING_BY_REPO, "check_value", diff --git a/tasks/tests/unit/test_upload_processing_task.py b/tasks/tests/unit/test_upload_processing_task.py index 0406bbb37..9d5d18032 100644 --- a/tasks/tests/unit/test_upload_processing_task.py +++ b/tasks/tests/unit/test_upload_processing_task.py @@ -17,6 +17,7 @@ ReportExpiredException, RepositoryWithoutValidBotError, ) +from helpers.parallel import ParallelProcessing from rollouts import USE_LABEL_INDEX_IN_REPORT_PROCESSING_BY_REPO_ID from services.archive import ArchiveService from services.report import RawReportInfo, ReportService @@ -346,6 +347,7 @@ def test_upload_processor_call_with_upload_obj( commit_yaml={"codecov": {"max_report_age": False}}, arguments_list=redis_queue, report_code=None, + parallel_processing=ParallelProcessing(), ) expected_result = { "processings_so_far": [ @@ -545,7 +547,7 @@ def test_upload_task_call_exception_within_individual_upload( assert upload.state_id == UploadState.ERROR.db_id assert upload.state == "error" assert not mocked_3.called - mocked_4.assert_called_with(commit.repository, upload, is_parallel=False) + mocked_4.assert_called_with(commit.repository, upload, mocker.ANY) mocked_5.assert_called() @pytest.mark.django_db(databases={"default"}) @@ -715,6 +717,7 @@ def test_upload_task_process_individual_report_with_notfound_report( report=false_report, raw_report_info=RawReportInfo(), upload=upload, + parallel_processing=ParallelProcessing(), ) assert result.error.as_dict() == { "code": "file_not_in_storage", @@ -740,6 +743,7 @@ def test_upload_task_process_individual_report_with_notfound_report_no_retries_y Report(), UploadFactory.create(), RawReportInfo(), + parallel_processing=ParallelProcessing(), ) @pytest.mark.django_db(databases={"default"}) diff --git a/tasks/tests/unit/test_upload_task.py b/tasks/tests/unit/test_upload_task.py index 3fdc7a0c8..00906ea5c 100644 --- a/tasks/tests/unit/test_upload_task.py +++ b/tasks/tests/unit/test_upload_task.py @@ -13,7 +13,6 @@ from shared.torngit.exceptions import TorngitClientError, TorngitRepoNotFoundError from shared.torngit.gitlab import Gitlab from shared.utils.sessions import SessionType -from shared.yaml import UserYaml from database.enums import ReportType from database.models import Upload @@ -791,7 +790,7 @@ def test_upload_task_no_bot( mocked_1.assert_called_with( mocker.ANY, commit, - UserYaml({"codecov": {"max_report_age": "764y ago"}}), + {"codecov": {"max_report_age": "764y ago"}}, [ {"build": "part1", "url": "url1", "upload_pk": mocker.ANY}, {"build": "part2", "url": "url2", "upload_pk": mocker.ANY}, @@ -847,7 +846,7 @@ def test_upload_task_bot_no_permissions( mocked_1.assert_called_with( mocker.ANY, commit, - UserYaml({"codecov": {"max_report_age": "764y ago"}}), + {"codecov": {"max_report_age": "764y ago"}}, [ {"build": "part1", "url": "url1", "upload_pk": mocker.ANY}, {"build": "part2", "url": "url2", "upload_pk": mocker.ANY}, @@ -924,7 +923,7 @@ def test_upload_task_bot_unauthorized( mocked_schedule_task.assert_called_with( mocker.ANY, commit, - UserYaml({"codecov": {"max_report_age": "764y ago"}}), + {"codecov": {"max_report_age": "764y ago"}}, [ {"build": "part1", "url": "url1", "upload_pk": first_session.id}, {"build": "part2", "url": "url2", "upload_pk": second_session.id}, @@ -1015,7 +1014,7 @@ def fail_if_try_to_create_upload(*args, **kwargs): mocked_schedule_task.assert_called_with( mocker.ANY, commit, - UserYaml({"codecov": {"max_report_age": "764y ago"}}), + {"codecov": {"max_report_age": "764y ago"}}, [ { "build": "part1", @@ -1135,7 +1134,7 @@ def test_normalize_upload_arguments( def test_schedule_task_with_one_task(self, dbsession, mocker): mocked_chain = mocker.patch("tasks.upload.chain") commit = CommitFactory.create() - commit_yaml = UserYaml({"codecov": {"max_report_age": "100y ago"}}) + commit_yaml = {"codecov": {"max_report_age": "100y ago"}} argument_dict = {"argument_dict": 1} argument_list = [argument_dict] dbsession.add(commit) @@ -1160,7 +1159,7 @@ def test_schedule_task_with_one_task(self, dbsession, mocker): {}, repoid=commit.repoid, commitid=commit.commitid, - commit_yaml=commit_yaml.to_dict(), + commit_yaml=commit_yaml, arguments_list=argument_list, report_code=None, in_parallel=False, @@ -1170,7 +1169,7 @@ def test_schedule_task_with_one_task(self, dbsession, mocker): kwargs={ "repoid": commit.repoid, "commitid": commit.commitid, - "commit_yaml": commit_yaml.to_dict(), + "commit_yaml": commit_yaml, "report_code": None, "in_parallel": False, _kwargs_key(UploadFlow): mocker.ANY, diff --git a/tasks/upload.py b/tasks/upload.py index 725f973f7..45eca3be2 100644 --- a/tasks/upload.py +++ b/tasks/upload.py @@ -17,10 +17,7 @@ from shared.django_apps.codecov_metrics.service.codecov_metrics import ( UserOnboardingMetricsService, ) -from shared.torngit.exceptions import ( - TorngitClientError, - TorngitRepoNotFoundError, -) +from shared.torngit.exceptions import TorngitClientError, TorngitRepoNotFoundError from shared.yaml import UserYaml from shared.yaml.user_yaml import OwnerContext from sqlalchemy.orm import Session @@ -29,19 +26,14 @@ from database.enums import CommitErrorTypes, ReportType from database.models import Commit, CommitReport from database.models.core import GITHUB_APP_INSTALLATION_DEFAULT_NAME -from helpers.checkpoint_logger import ( - CheckpointLogger, - _kwargs_key, -) -from helpers.checkpoint_logger import ( - from_kwargs as checkpoints_from_kwargs, -) +from helpers.checkpoint_logger import CheckpointLogger, _kwargs_key +from helpers.checkpoint_logger import from_kwargs as checkpoints_from_kwargs from helpers.checkpoint_logger.flows import TestResultsFlow, UploadFlow from helpers.exceptions import RepositoryWithoutValidBotError from helpers.github_installation import get_installation_name_for_owner_for_task +from helpers.parallel import ParallelProcessing from helpers.parallel_upload_processing import get_parallel_session_ids from helpers.save_commit_error import save_commit_error -from rollouts import PARALLEL_UPLOAD_PROCESSING_BY_REPO from services.archive import ArchiveService from services.bundle_analysis.report import BundleAnalysisReportService from services.redis import ( @@ -518,7 +510,7 @@ def run_impl_within_lock( scheduled_tasks = self.schedule_task( db_session, commit, - commit_yaml, + commit_yaml.to_dict(), argument_list, commit_report, upload_context, @@ -548,14 +540,12 @@ def schedule_task( self, db_session: Session, commit: Commit, - commit_yaml: UserYaml, + commit_yaml: dict, argument_list: list[dict], commit_report: CommitReport, upload_context: UploadContext, checkpoints: CheckpointLogger | None, ): - commit_yaml = commit_yaml.to_dict() - # Carryforward the parent BA report for the current commit's BA report when handling uploads # that's not bundle analysis type. self.possibly_carryforward_bundle_report( @@ -603,6 +593,22 @@ def _schedule_coverage_processing_task( ): checkpoints.log(UploadFlow.INITIAL_PROCESSING_COMPLETE) + parallel_processing = ParallelProcessing.initial(upload_context.repoid) + + if parallel_processing.run_fully_parallel or parallel_processing.run_experiment: + parallel_tasks = self.create_parallel_tasks( + db_session, + commit, + commit_yaml, + argument_list, + commit_report, + upload_context, + checkpoints, + ) + + if parallel_processing.run_fully_parallel: + return parallel_tasks.apply_async() + processing_tasks = [ upload_processor_task.s( repoid=commit.repoid, @@ -630,16 +636,25 @@ def _schedule_coverage_processing_task( }, ) ) - serial_tasks = chain(processing_tasks) - do_parallel_processing = PARALLEL_UPLOAD_PROCESSING_BY_REPO.check_value( - identifier=commit.repository.repoid - ) - - if not do_parallel_processing: + if not parallel_processing.run_experiment: return serial_tasks.apply_async() + # else: + parallel_shadow_experiment = serial_tasks | parallel_tasks + return parallel_shadow_experiment.apply_async() + @sentry_sdk.trace + def create_parallel_tasks( + self, + db_session: Session, + commit: Commit, + commit_yaml: dict, + argument_list: list[dict], + commit_report: CommitReport, + upload_context: UploadContext, + checkpoints: CheckpointLogger, + ): report_service = ReportService(commit_yaml) sessions = report_service.build_sessions(commit=commit) @@ -691,6 +706,7 @@ def _schedule_coverage_processing_task( parallel_processing_tasks = [ upload_processor_task.s( + {}, repoid=commit.repoid, commitid=commit.commitid, commit_yaml=commit_yaml, @@ -717,8 +733,7 @@ def _schedule_coverage_processing_task( ) parallel_tasks = chord(parallel_processing_tasks, finish_parallel_sig) - parallel_shadow_experiment = serial_tasks | parallel_tasks - return parallel_shadow_experiment.apply_async() + return parallel_tasks def _schedule_bundle_analysis_processing_task( self, diff --git a/tasks/upload_finisher.py b/tasks/upload_finisher.py index 2c9372334..7b188b0f2 100644 --- a/tasks/upload_finisher.py +++ b/tasks/upload_finisher.py @@ -21,11 +21,12 @@ from app import celery_app from celery_config import notify_error_task_name from database.models import Commit, Pull +from database.models.core import Repository from helpers.checkpoint_logger import _kwargs_key from helpers.checkpoint_logger import from_kwargs as checkpoints_from_kwargs from helpers.checkpoint_logger.flows import UploadFlow -from helpers.metrics import KiB, MiB, metrics -from rollouts import PARALLEL_UPLOAD_PROCESSING_BY_REPO +from helpers.metrics import KiB, MiB +from helpers.parallel import ParallelProcessing from services.archive import ArchiveService, MinioEndpoints from services.comparison import get_or_create_comparison from services.redis import get_redis_connection @@ -35,6 +36,7 @@ from tasks.base import BaseCodecovTask from tasks.parallel_verification import parallel_verification_task from tasks.upload_clean_labels_index import task_name as clean_labels_index_task_name +from tasks.upload_processor import UploadProcessorTask log = logging.getLogger(__name__) @@ -99,7 +101,6 @@ def run_impl( repoid, commitid, commit_yaml, - in_parallel=False, report_code=None, **kwargs, ): @@ -127,27 +128,22 @@ def run_impl( assert commit, "Commit not found in database." repository = commit.repository + parallel_processing = ParallelProcessing.from_task_args(repoid, **kwargs) + if ( - PARALLEL_UPLOAD_PROCESSING_BY_REPO.check_value(identifier=repository.repoid) - and in_parallel + parallel_processing.is_experiment_parallel + or parallel_processing.is_fully_parallel ): - actual_processing_results = { - "processings_so_far": [], - "parallel_incremental_result": [], - } - pr = None - # need to transform processing_results produced by chord to get it into the # same format as the processing_results produced from chain - for task in processing_results: - pr = task["processings_so_far"][0].get("pr") or pr - actual_processing_results["processings_so_far"].append( - task["processings_so_far"][0] - ) - actual_processing_results["parallel_incremental_result"].append( - task["parallel_incremental_result"] - ) - processing_results = actual_processing_results + processing_results = { + "processings_so_far": [ + task["processings_so_far"][0] for task in processing_results + ], + "parallel_incremental_result": [ + task["parallel_incremental_result"] for task in processing_results + ], + } report_service = ReportService(commit_yaml) report = self.merge_incremental_reports( @@ -156,6 +152,7 @@ def run_impl( commit, report_service, processing_results, + parallel_processing, ) log.info( @@ -168,29 +165,42 @@ def run_impl( ), ) - with metrics.timer(f"{self.metrics_prefix}.save_parallel_report_results"): + if parallel_processing.is_fully_parallel: + pr = processing_results["processings_so_far"][0]["arguments"].get("pr") + processor_task = UploadProcessorTask() + processor_task.save_report_results( + db_session, + report_service, + repository, + commit, + report, + pr, + report_code, + ) + + else: parallel_paths = report_service.save_parallel_report_to_archive( commit, report, report_code ) - # now that we've built the report and stored it to GCS, we have what we need to - # compare the results with the current upload pipeline. We end execution of the - # finisher task here so that we don't cause any additional side-effects - - # The verification task that will compare the results of the serial flow and - # the parallel flow, and log the result to determine if parallel flow is - # working properly. - parallel_verification_task.apply_async( - kwargs=dict( - repoid=repoid, - commitid=commitid, - commit_yaml=commit_yaml, - report_code=report_code, - parallel_paths=parallel_paths, - processing_results=processing_results, - ), - ) + # now that we've built the report and stored it to GCS, we have what we need to + # compare the results with the current upload pipeline. We end execution of the + # finisher task here so that we don't cause any additional side-effects + + # The verification task that will compare the results of the serial flow and + # the parallel flow, and log the result to determine if parallel flow is + # working properly. + parallel_verification_task.apply_async( + kwargs=dict( + repoid=repoid, + commitid=commitid, + commit_yaml=commit_yaml, + report_code=report_code, + parallel_paths=parallel_paths, + processing_results=processing_results, + ), + ) - return + return lock_name = f"upload_finisher_lock_{repoid}_{commitid}" redis_connection = get_redis_connection() @@ -477,45 +487,56 @@ def invalidate_caches(self, redis_connection, commit: Commit): def merge_incremental_reports( self, commit_yaml: dict, - repository, + repository: Repository, commit: Commit, report_service: ReportService, processing_results, + parallel_processing: ParallelProcessing, ): archive_service = report_service.get_archive_service(repository) repoid = repository.repoid commitid = commit.id - fas_path = MinioEndpoints.parallel_upload_experiment.get_path( - version="v4", - repo_hash=archive_service.get_archive_hash(repository), - commitid=commit.commitid, - file_name="files_and_sessions", - ) - chunks_path = MinioEndpoints.parallel_upload_experiment.get_path( - version="v4", - repo_hash=archive_service.get_archive_hash(repository), - commitid=commit.commitid, - file_name="chunks", - ) + if parallel_processing.is_fully_parallel: + report = report_service.get_existing_report_for_commit(commit) + if report is None: + log.info( + "No base report found for parallel upload processing, using an empty report", + extra=dict(commit=commitid, repoid=repoid), + ) + report = Report() - try: - files_and_sessions = json.loads(archive_service.read_file(fas_path)) - chunks = archive_service.read_file(chunks_path).decode(errors="replace") - report = report_service.build_report( - chunks, - files_and_sessions["files"], - files_and_sessions["sessions"], - None, + else: + fas_path = MinioEndpoints.parallel_upload_experiment.get_path( + version="v4", + repo_hash=archive_service.get_archive_hash(repository), + commitid=commit.commitid, + file_name="files_and_sessions", ) - except ( - FileNotInStorageError - ): # there were no CFFs, so no report was stored in GCS - log.info( - "No base report found for parallel upload processing, using an empty report", - extra=dict(commit=commitid, repoid=repoid), + chunks_path = MinioEndpoints.parallel_upload_experiment.get_path( + version="v4", + repo_hash=archive_service.get_archive_hash(repository), + commitid=commit.commitid, + file_name="chunks", ) - report = Report() + + try: + files_and_sessions = json.loads(archive_service.read_file(fas_path)) + chunks = archive_service.read_file(chunks_path).decode(errors="replace") + report = report_service.build_report( + chunks, + files_and_sessions["files"], + files_and_sessions["sessions"], + None, + ) + except ( + FileNotInStorageError + ): # there were no CFFs, so no report was stored in GCS + log.info( + "No base report found for parallel upload processing, using an empty report", + extra=dict(commit=commitid, repoid=repoid), + ) + report = Report() log.info( "Downloading %s incremental reports that were processed in parallel", diff --git a/tasks/upload_processor.py b/tasks/upload_processor.py index e231e1de1..57c46d390 100644 --- a/tasks/upload_processor.py +++ b/tasks/upload_processor.py @@ -20,12 +20,12 @@ from helpers.exceptions import RepositoryWithoutValidBotError from helpers.github_installation import get_installation_name_for_owner_for_task from helpers.metrics import metrics +from helpers.parallel import ParallelProcessing from helpers.parallel_upload_processing import ( save_final_serial_report_results, save_incremental_report_results, ) from helpers.save_commit_error import save_commit_error -from rollouts import PARALLEL_UPLOAD_PROCESSING_BY_REPO from services.archive import ArchiveService from services.redis import get_redis_connection from services.report import ProcessingResult, RawReportInfo, Report, ReportService @@ -81,22 +81,22 @@ def run_impl( commit_yaml, arguments_list, report_code=None, - parallel_idx=None, - in_parallel=False, - is_final=False, **kwargs, ): repoid = int(repoid) log.info( "Received upload processor task", - extra=dict(repoid=repoid, commit=commitid, in_parallel=in_parallel), + extra=dict( + repoid=repoid, commit=commitid, in_parallel=kwargs.get("in_parallel") + ), ) - in_parallel = in_parallel and PARALLEL_UPLOAD_PROCESSING_BY_REPO.check_value( - identifier=repoid - ) + parallel_processing = ParallelProcessing.from_task_args(repoid, **kwargs) - if in_parallel: + if ( + parallel_processing.is_fully_parallel + or parallel_processing.is_experiment_parallel + ): log.info( "Using parallel upload processing, skip acquiring upload processing lock", extra=dict( @@ -107,8 +107,8 @@ def run_impl( ), ) - # This function is named `within_lock` but we gate any concurrency- - # unsafe operations with `PARALLEL_UPLOAD_PROCESSING_BY_REPO`. + # This function is named `_within_lock`, but locking is only necessary + # in the non-parallel variant of this task return self.process_impl_within_lock( db_session=db_session, previous_results={}, @@ -116,9 +116,8 @@ def run_impl( commitid=commitid, commit_yaml=commit_yaml, arguments_list=arguments_list, - parallel_idx=parallel_idx, report_code=report_code, - in_parallel=in_parallel, + parallel_processing=parallel_processing, ) lock_name = UPLOAD_PROCESSING_LOCK_NAME(repoid, commitid) @@ -139,6 +138,15 @@ def run_impl( timeout=max(60 * 5, self.hard_time_limit_task), blocking_timeout=5, ): + log.info( + "Obtained upload processing lock, starting", + extra=dict( + repoid=repoid, + commit=commitid, + parent_task=self.request.parent_id, + report_code=report_code, + ), + ) actual_arguments_list = deepcopy(arguments_list) return self.process_impl_within_lock( db_session=db_session, @@ -148,9 +156,7 @@ def run_impl( commit_yaml=commit_yaml, arguments_list=actual_arguments_list, report_code=report_code, - parallel_idx=parallel_idx, - in_parallel=in_parallel, - is_final=is_final, + parallel_processing=parallel_processing, ) except LockError: max_retry = 200 * 3**self.request.retries @@ -177,21 +183,8 @@ def process_impl_within_lock( commit_yaml: dict, arguments_list, report_code, - parallel_idx=None, - in_parallel=False, - is_final=False, + parallel_processing: ParallelProcessing, ): - if in_parallel: - log.info( - "Obtained upload processing lock, starting", - extra=dict( - repoid=repoid, - commit=commitid, - parent_task=self.request.parent_id, - report_code=report_code, - ), - ) - processings_so_far = previous_results.get("processings_so_far", []) n_processed = 0 n_failed = 0 @@ -206,6 +199,11 @@ def process_impl_within_lock( pr = None report_service = ReportService(UserYaml(commit_yaml)) + in_parallel = ( + parallel_processing.is_experiment_parallel + or parallel_processing.is_fully_parallel + ) + if in_parallel: log.info( "Creating empty report to store incremental result", @@ -213,16 +211,15 @@ def process_impl_within_lock( ) report = Report() else: - with metrics.timer(f"{self.metrics_prefix}.build_original_report"): - report = report_service.get_existing_report_for_commit( - commit, report_code=report_code + report = report_service.get_existing_report_for_commit( + commit, report_code=report_code + ) + if report is None: + log.info( + "No existing report for commit", + extra=dict(commit=commit.commitid), ) - if report is None: - log.info( - "No existing report for commit", - extra=dict(commit=commit.commitid), - ) - report = Report() + report = Report() raw_reports: list[RawReportInfo] = [] try: @@ -261,8 +258,7 @@ def process_impl_within_lock( report, upload_obj, raw_report_info, - parallel_idx=parallel_idx, - in_parallel=in_parallel, + parallel_processing, ) # NOTE: this is only used because test mocking messes with the return value here. # in normal flow, the function mutates the argument instead. @@ -320,7 +316,11 @@ def process_impl_within_lock( results_dict = {} if in_parallel: parallel_incremental_result = save_incremental_report_results( - report_service, commit, report, parallel_idx, report_code + report_service, + commit, + report, + parallel_processing.parallel_idx, + report_code, ) parallel_incremental_result["upload_pk"] = arguments_list[0].get( "upload_pk" @@ -349,7 +349,7 @@ def process_impl_within_lock( # ParallelVerification task to compare with later, for the parallel # experiment. The report being saved is not necessarily the final # report for the commit, as more uploads can still be made. - if is_final: + if parallel_processing.is_final: final_serial_report_url = save_final_serial_report_results( report_service, commit, report, report_code, arguments_list ) @@ -405,11 +405,10 @@ def process_individual_report( report: Report, upload: Upload, raw_report_info: RawReportInfo, - parallel_idx=None, - in_parallel=False, + parallel_processing: ParallelProcessing, ) -> ProcessingResult: processing_result = report_service.build_report_from_raw_content( - report, raw_report_info, upload=upload, parallel_idx=parallel_idx + report, raw_report_info, upload, parallel_processing ) if ( processing_result.error is not None @@ -431,7 +430,7 @@ def process_individual_report( # for the parallel experiment, we don't want to modify anything in the # database, so we disable it here - if not in_parallel: + if not parallel_processing.is_experiment_parallel: report_service.update_upload_with_processing_result( upload, processing_result )