Skip to content

Commit

Permalink
Implement fully parallel upload processing
Browse files Browse the repository at this point in the history
This adds another variant to the `PARALLEL_PROCESSING` feature/rollout flag which prefers the parallel upload processing pipeline in favor of running it as an experiment.

Upload Processing can run in essentially 4 modes:
- Completely serial processing
- Serial processing, but running "experiment" code (`EXPERIMENT_SERIAL`):
  - In this mode, the final (`is_final`) `UploadProcessor` task saves a copy
    of the final report for later verification.
- Parallel processing, but running "experiment" code (`EXPERIMENT_PARALLEL`):
  - In this mode, another parallel set of `UploadProcessor` tasks runs *after*
    the main set up tasks.
  - These tasks are not persisting any of their results in the database,
    instead the final `UploadFinisher` task will launch the `ParallelVerification` task.
- Fully parallel processing (`PARALLEL`):
  - In this mode, the final `UploadFinisher` task is responsible for merging
    the final report and persisting it.

An example Task chain might look like this, in "experiment" mode:
- Upload
  - UploadProcessor
    - UploadProcessor
      - UploadProcessor (`EXPERIMENT_SERIAL` (the final one))
        - UploadFinisher
          - UploadProcessor (`EXPERIMENT_PARALLEL`)
          - UploadProcessor (`EXPERIMENT_PARALLEL`)
          - UploadProcessor (`EXPERIMENT_PARALLEL`)
            - UploadFinisher (`EXPERIMENT_PARALLEL`)
              - ParallelVerification

The `PARALLEL` mode looks like this:
- Upload
  - UploadProcessor (`PARALLEL`)
  - UploadProcessor (`PARALLEL`)
  - UploadProcessor (`PARALLEL`)
    - UploadFinisher (`PARALLEL`)
  • Loading branch information
Swatinem committed Oct 4, 2024
1 parent c97132d commit 3ef30d7
Show file tree
Hide file tree
Showing 7 changed files with 322 additions and 156 deletions.
96 changes: 96 additions & 0 deletions helpers/parallel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
from __future__ import annotations

from enum import Enum

from rollouts import PARALLEL_UPLOAD_PROCESSING_BY_REPO

"""
This encapsulates Parallel Upload Processing logic
Upload Processing can run in essentially 4 modes:
- Completely serial processing
- Serial processing, but running "experiment" code (`EXPERIMENT_SERIAL`):
- In this mode, the final (`is_final`) `UploadProcessor` task saves a copy
of the final report for later verification.
- Parallel processing, but running "experiment" code (`EXPERIMENT_PARALLEL`):
- In this mode, another parallel set of `UploadProcessor` tasks runs *after*
the main set up tasks.
- These tasks are not persisting any of their results in the database,
instead the final `UploadFinisher` task will launch the `ParallelVerification` task.
- Fully parallel processing (`PARALLEL`):
- In this mode, the final `UploadFinisher` task is responsible for merging
the final report and persisting it.
An example Task chain might look like this, in "experiment" mode:
- Upload
- UploadProcessor
- UploadProcessor
- UploadProcessor (`EXPERIMENT_SERIAL` (the final one))
- UploadFinisher
- UploadProcessor (`EXPERIMENT_PARALLEL`)
- UploadProcessor (`EXPERIMENT_PARALLEL`)
- UploadProcessor (`EXPERIMENT_PARALLEL`)
- UploadFinisher (`EXPERIMENT_PARALLEL`)
- ParallelVerification
The `PARALLEL` mode looks like this:
- Upload
- UploadProcessor (`PARALLEL`)
- UploadProcessor (`PARALLEL`)
- UploadProcessor (`PARALLEL`)
- UploadFinisher (`PARALLEL`)
"""


class ParallelFeature(Enum):
SERIAL = "serial"
EXPERIMENT = "experiment"
PARALLEL = "parallel"

@classmethod
def load(cls, repoid: int) -> ParallelFeature:
feature = PARALLEL_UPLOAD_PROCESSING_BY_REPO.check_value(
identifier=repoid, default="serial"
)

if feature == "experiment" or feature is True:
return ParallelFeature.EXPERIMENT
if feature == "parallel":
return ParallelFeature.PARALLEL
return ParallelFeature.SERIAL


class ParallelProcessing(Enum):
SERIAL = "serial"
EXPERIMENT_SERIAL = "experiment-serial"
EXPERIMENT_PARALLEL = "experiment-parallel"
PARALLEL = "parallel"

@property
def is_parallel(self) -> bool:
return (
self is ParallelProcessing.EXPERIMENT_PARALLEL
or self is ParallelProcessing.PARALLEL
)

@classmethod
def from_task_args(
cls,
repoid: int,
in_parallel: bool = False,
is_final: bool = False,
**kwargs,
) -> ParallelProcessing:
feature = ParallelFeature.load(repoid)

if feature is ParallelFeature.SERIAL:
return ParallelProcessing.SERIAL
if feature is ParallelFeature.PARALLEL:
return ParallelProcessing.PARALLEL

if in_parallel:
return ParallelProcessing.EXPERIMENT_PARALLEL
if is_final:
return ParallelProcessing.EXPERIMENT_SERIAL
return ParallelProcessing.SERIAL
21 changes: 7 additions & 14 deletions services/report/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,9 @@
ReportExpiredException,
RepositoryWithoutValidBotError,
)
from helpers.parallel import ParallelFeature
from helpers.telemetry import MetricContext
from rollouts import (
CARRYFORWARD_BASE_SEARCH_RANGE_BY_OWNER,
PARALLEL_UPLOAD_PROCESSING_BY_REPO,
)
from rollouts import CARRYFORWARD_BASE_SEARCH_RANGE_BY_OWNER
from services.archive import ArchiveService
from services.report.parser import get_proper_parser
from services.report.parser.types import ParsedRawReport
Expand Down Expand Up @@ -255,11 +253,10 @@ def initialize_and_save_report(
# This means there is a report to carryforward
self.save_full_report(commit, report, report_code)

parallel_processing = ParallelFeature.load(commit.repository.repoid)
# Behind parallel processing flag, save the CFF report to GCS so the parallel variant of
# finisher can build off of it later.
if PARALLEL_UPLOAD_PROCESSING_BY_REPO.check_value(
identifier=commit.repository.repoid
):
if parallel_processing is ParallelFeature.EXPERIMENT:
self.save_parallel_report_to_archive(commit, report, report_code)

return current_report_row
Expand Down Expand Up @@ -790,13 +787,6 @@ def update_upload_with_processing_result(
session = processing_result.session

if processing_result.error is None:
# this should be enabled for the actual rollout of parallel upload processing.
# if PARALLEL_UPLOAD_PROCESSING_BY_REPO.check_value(
# "this should be the repo id"
# ):
# upload_obj.state_id = UploadState.PARALLEL_PROCESSED.db_id
# upload_obj.state = "parallel_processed"
# else:
upload.state_id = UploadState.PROCESSED.db_id
upload.state = "processed"
upload.order_number = session.id
Expand Down Expand Up @@ -1032,6 +1022,9 @@ def delete_uploads_by_sessionid(upload: Upload, session_ids: list[int]):
"""
This deletes all the `Upload` records corresponding to the given `session_ids`.
"""
if not session_ids:
return

db_session = upload.get_db_session()
uploads = (
db_session.query(Upload.id_)
Expand Down
12 changes: 6 additions & 6 deletions tasks/tests/integration/test_upload_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,10 @@ def setup_mocks(

@pytest.mark.integration
@pytest.mark.django_db()
@pytest.mark.parametrize("do_parallel_processing", [False, True])
@pytest.mark.parametrize("parallel_processing", ["serial", "experiment", "parallel"])
def test_full_upload(
dbsession: DbSession,
do_parallel_processing: bool,
parallel_processing: str,
mocker,
mock_repo_provider,
mock_storage,
Expand All @@ -176,7 +176,7 @@ def test_full_upload(
mocker.patch.object(
PARALLEL_UPLOAD_PROCESSING_BY_REPO,
"check_value",
return_value=do_parallel_processing,
return_value=parallel_processing,
)

repository = RepositoryFactory.create()
Expand Down Expand Up @@ -360,10 +360,10 @@ def test_full_upload(

@pytest.mark.integration
@pytest.mark.django_db()
@pytest.mark.parametrize("do_parallel_processing", [False, True])
@pytest.mark.parametrize("parallel_processing", ["serial", "experiment", "parallel"])
def test_full_carryforward(
dbsession: DbSession,
do_parallel_processing: bool,
parallel_processing: bool,
mocker,
mock_repo_provider,
mock_storage,
Expand All @@ -378,7 +378,7 @@ def test_full_carryforward(
mocker.patch.object(
PARALLEL_UPLOAD_PROCESSING_BY_REPO,
"check_value",
return_value=do_parallel_processing,
return_value=parallel_processing,
)

repository = RepositoryFactory.create()
Expand Down
6 changes: 5 additions & 1 deletion tasks/tests/unit/test_upload_processing_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
ReportExpiredException,
RepositoryWithoutValidBotError,
)
from helpers.parallel import ParallelProcessing
from rollouts import USE_LABEL_INDEX_IN_REPORT_PROCESSING_BY_REPO_ID
from services.archive import ArchiveService
from services.report import ProcessingError, RawReportInfo, ReportService
Expand Down Expand Up @@ -322,14 +323,15 @@ def test_upload_processor_call_with_upload_obj(
redis_queue = [{"url": url, "upload_pk": upload.id_}]
mocked_3 = mocker.patch.object(UploadProcessorTask, "app")
mocked_3.send_task.return_value = True
result = UploadProcessorTask().process_impl_within_lock(
result = UploadProcessorTask().process_upload(
db_session=dbsession,
previous_results={},
repoid=commit.repoid,
commitid=commit.commitid,
commit_yaml={"codecov": {"max_report_age": False}},
arguments_list=redis_queue,
report_code=None,
parallel_processing=ParallelProcessing.SERIAL,
)

assert result == {
Expand Down Expand Up @@ -726,6 +728,7 @@ def test_upload_task_process_individual_report_with_notfound_report(
report=false_report,
raw_report_info=RawReportInfo(),
upload=upload,
parallel_processing=ParallelProcessing.SERIAL,
)
assert result.error.as_dict() == {
"code": "file_not_in_storage",
Expand All @@ -751,6 +754,7 @@ def test_upload_task_process_individual_report_with_notfound_report_no_retries_y
Report(),
UploadFactory.create(),
RawReportInfo(),
parallel_processing=ParallelProcessing.SERIAL,
)

@pytest.mark.django_db(databases={"default"})
Expand Down
51 changes: 41 additions & 10 deletions tasks/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@
from helpers.checkpoint_logger.flows import TestResultsFlow, UploadFlow
from helpers.exceptions import RepositoryWithoutValidBotError
from helpers.github_installation import get_installation_name_for_owner_for_task
from helpers.parallel import ParallelFeature
from helpers.reports import delete_archive_setting
from helpers.save_commit_error import save_commit_error
from rollouts import PARALLEL_UPLOAD_PROCESSING_BY_REPO
from services.archive import ArchiveService
from services.bundle_analysis.report import BundleAnalysisReportService
from services.redis import download_archive_from_redis, get_redis_connection
Expand Down Expand Up @@ -606,9 +606,24 @@ def _schedule_coverage_processing_task(
):
checkpoints.log(UploadFlow.INITIAL_PROCESSING_COMPLETE)

do_parallel_processing = PARALLEL_UPLOAD_PROCESSING_BY_REPO.check_value(
identifier=commit.repository.repoid
) and not delete_archive_setting(commit_yaml)
parallel_feature = ParallelFeature.load(upload_context.repoid)
if parallel_feature is ParallelFeature.EXPERIMENT and delete_archive_setting(
commit_yaml
):
parallel_feature = ParallelFeature.SERIAL

Check warning on line 613 in tasks/upload.py

View check run for this annotation

Codecov Notifications / codecov/patch

tasks/upload.py#L613

Added line #L613 was not covered by tests

if parallel_feature is not ParallelFeature.SERIAL:
parallel_tasks = self.create_parallel_tasks(
commit,
commit_yaml,
argument_list,
commit_report,
checkpoints,
parallel_feature is ParallelFeature.PARALLEL,
)

if parallel_feature is ParallelFeature.PARALLEL:
return parallel_tasks.apply_async()

processing_tasks = [
upload_processor_task.s(
Expand All @@ -623,7 +638,7 @@ def _schedule_coverage_processing_task(
for chunk in itertools.batched(argument_list, CHUNK_SIZE)
]
processing_tasks[0].args = ({},) # this is the first `previous_results`
if do_parallel_processing:
if parallel_feature is ParallelFeature.EXPERIMENT:
processing_tasks[-1].kwargs.update(is_final=True)

processing_tasks.append(
Expand All @@ -638,12 +653,24 @@ def _schedule_coverage_processing_task(
},
)
)

serial_tasks = chain(processing_tasks)

if not do_parallel_processing:
return serial_tasks.apply_async()
if parallel_feature is ParallelFeature.EXPERIMENT:
parallel_shadow_experiment = serial_tasks | parallel_tasks
return parallel_shadow_experiment.apply_async()

return serial_tasks.apply_async()

@sentry_sdk.trace
def create_parallel_tasks(
self,
commit: Commit,
commit_yaml: dict,
argument_list: list[dict],
commit_report: CommitReport,
checkpoints: CheckpointLogger,
run_fully_parallel: bool,
):
parallel_processing_tasks = [
upload_processor_task.s(
repoid=commit.repoid,
Expand All @@ -657,6 +684,11 @@ def _schedule_coverage_processing_task(
)
for arguments in argument_list
]
if run_fully_parallel:
for task in parallel_processing_tasks:
# this is the `previous_results`, which celery provides when running
# in a chain as part of the experiment, otherwise we have to provide this.
task.args = ({},)

finish_parallel_sig = upload_finisher_task.signature(
kwargs={
Expand All @@ -670,8 +702,7 @@ def _schedule_coverage_processing_task(
)

parallel_tasks = chord(parallel_processing_tasks, finish_parallel_sig)
parallel_shadow_experiment = serial_tasks | parallel_tasks
return parallel_shadow_experiment.apply_async()
return parallel_tasks

def _schedule_bundle_analysis_processing_task(
self,
Expand Down
Loading

0 comments on commit 3ef30d7

Please sign in to comment.