Skip to content

Commit

Permalink
Only update the Upload once in UploadFinisher
Browse files Browse the repository at this point in the history
This avoids updating the `Upload` state in the first `UploadProcessor` job, deferring that work to the final `UploadFinisher`, which was doing the same anyway.

This way, it is also possible to run the `UPDATE`, as well as all the `INSERT`s for either `UploadError` or `UploadLevelTotals` (which we should start removing) in bulk.
  • Loading branch information
Swatinem committed Oct 31, 2024
1 parent 4ab80f2 commit 3d20406
Show file tree
Hide file tree
Showing 9 changed files with 156 additions and 174 deletions.
92 changes: 80 additions & 12 deletions services/processing/merging.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
import functools
from decimal import Decimal

import sentry_sdk
from shared.reports.editable import EditableReport, EditableReportFile
from shared.reports.enums import UploadState
from shared.reports.resources import Report
from shared.reports.resources import Report, ReportTotals
from shared.yaml import UserYaml
from sqlalchemy.orm import Session as DbSession

from database.models.reports import Upload
from database.models.reports import Upload, UploadError, UploadLevelTotals
from helpers.number import precise_round
from services.report import delete_uploads_by_sessionid
from services.report.raw_upload_processor import clear_carryforward_sessions
from services.yaml.reader import read_yaml_field

from .types import IntermediateReport, MergeResult
from .types import IntermediateReport, MergeResult, ProcessingResult


@sentry_sdk.trace
Expand Down Expand Up @@ -49,7 +54,13 @@ def merge_reports(


@sentry_sdk.trace
def update_uploads(db_session: DbSession, merge_result: MergeResult):
def update_uploads(
db_session: DbSession,
commit_yaml: UserYaml,
processing_results: list[ProcessingResult],
intermediate_reports: list[IntermediateReport],
merge_result: MergeResult,
):
"""
Updates all the `Upload` records with the `MergeResult`.
In particular, this updates the `order_number` to match the new `session_id`,
Expand All @@ -69,17 +80,74 @@ def update_uploads(db_session: DbSession, merge_result: MergeResult):
db_session, report_id, merge_result.deleted_sessions
)

# then, update all the sessions that have been merged
for upload_id, session_id in merge_result.session_mapping.items():
update = {
Upload.state_id: UploadState.PROCESSED.db_id,
Upload.state: "processed",
Upload.order_number: session_id,
}
db_session.query(Upload).filter(Upload.id_ == upload_id).update(update)
precision: int = read_yaml_field(commit_yaml, ("coverage", "precision"), 2)
rounding: str = read_yaml_field(commit_yaml, ("coverage", "round"), "nearest")
make_totals = functools.partial(make_upload_totals, precision, rounding)

reports = {ir.upload_id: ir.report for ir in intermediate_reports}

# then, update all the `Upload`s with their state, and the final `order_number`,
# as well as add a `UploadLevelTotals` or `UploadError`s where appropriate.
all_errors: list[UploadError] = []
all_totals: list[UploadLevelTotals] = []
all_upload_updates: list[dict] = []
for result in processing_results:
upload_id = result["upload_id"]

if result["successful"]:
update = {
"state_id": UploadState.PROCESSED.db_id,
"state": "processed",
}
report = reports.get(upload_id)
if report is not None:
all_totals.append(make_totals(upload_id, report.totals))
elif result["error"]:
update = {

Check warning on line 106 in services/processing/merging.py

View check run for this annotation

Codecov Notifications / codecov/patch

services/processing/merging.py#L104-L106

Added lines #L104 - L106 were not covered by tests
"state_id": UploadState.ERROR.db_id,
"state": "error",
}
error = UploadError(

Check warning on line 110 in services/processing/merging.py

View check run for this annotation

Codecov Notifications / codecov/patch

services/processing/merging.py#L110

Added line #L110 was not covered by tests
upload_id=upload_id,
error_code=result["error"]["code"],
error_params=result["error"]["params"],
)
all_errors.append(error)

Check warning on line 115 in services/processing/merging.py

View check run for this annotation

Codecov Notifications / codecov/patch

services/processing/merging.py#L115

Added line #L115 was not covered by tests

update["id"] = upload_id
order_number = merge_result.session_mapping.get(upload_id)
update["order_number"] = order_number
all_upload_updates

db_session.bulk_update_mappings(Upload, all_upload_updates)
db_session.bulk_save_objects(all_errors)
db_session.bulk_save_objects(all_totals)

db_session.flush()


# TODO(swatinem): we should eventually remove `UploadLevelTotals` completely
def make_upload_totals(
precision: int, rounding: str, upload_id: int, totals: ReportTotals
) -> UploadLevelTotals:
if totals.coverage is not None:
coverage = precise_round(Decimal(totals.coverage), precision, rounding)

Check warning on line 134 in services/processing/merging.py

View check run for this annotation

Codecov Notifications / codecov/patch

services/processing/merging.py#L133-L134

Added lines #L133 - L134 were not covered by tests
else:
coverage = Decimal(0)

Check warning on line 136 in services/processing/merging.py

View check run for this annotation

Codecov Notifications / codecov/patch

services/processing/merging.py#L136

Added line #L136 was not covered by tests

return UploadLevelTotals(

Check warning on line 138 in services/processing/merging.py

View check run for this annotation

Codecov Notifications / codecov/patch

services/processing/merging.py#L138

Added line #L138 was not covered by tests
upload_id=upload_id,
branches=totals.branches,
coverage=coverage,
hits=totals.hits,
lines=totals.lines,
methods=totals.methods,
misses=totals.misses,
partials=totals.partials,
files=totals.files,
)


def change_sessionid(report: EditableReport, old_id: int, new_id: int):
"""
Modifies the `EditableReport`, changing the session with `old_id` to have `new_id` instead.
Expand Down
2 changes: 1 addition & 1 deletion services/processing/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def process_upload(
result["successful"] = True
log.info("Finished processing upload", extra={"result": result})

report_service.update_upload_with_processing_result(upload, processing_result)
# TODO(swatinem): only save the intermediate report on success
save_intermediate_report(archive_service, commit_sha, upload_id, report)
state.mark_upload_as_processed(upload_id)

Expand Down
4 changes: 3 additions & 1 deletion services/processing/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

from shared.reports.editable import EditableReport

from services.report import ProcessingErrorDict


class UploadArguments(TypedDict):
# TODO(swatinem): migrate this over to `upload_id`
Expand All @@ -13,7 +15,7 @@ class ProcessingResult(TypedDict):
upload_id: int
arguments: UploadArguments
successful: bool
error: NotRequired[dict]
error: NotRequired[ProcessingErrorDict]


@dataclass
Expand Down
68 changes: 7 additions & 61 deletions services/report/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import uuid
from dataclasses import dataclass
from time import time
from typing import Any, Mapping, Sequence
from typing import Any, Mapping, Sequence, TypedDict

import orjson
import sentry_sdk
Expand Down Expand Up @@ -61,13 +61,18 @@
from services.yaml.reader import get_paths_from_flags, read_yaml_field


class ProcessingErrorDict(TypedDict):
code: UploadErrorCode
params: dict[str, Any]


@dataclass
class ProcessingError:
code: UploadErrorCode
params: dict[str, Any]
is_retryable: bool = False

def as_dict(self):
def as_dict(self) -> ProcessingErrorDict:
return {"code": self.code, "params": self.params}


Expand Down Expand Up @@ -760,65 +765,6 @@ def build_report_from_raw_content(
raw_report_info.error = result.error
return result

def update_upload_with_processing_result(
self, upload: Upload, processing_result: ProcessingResult
):
rounding: str = read_yaml_field(
self.current_yaml, ("coverage", "round"), "nearest"
)
precision: int = read_yaml_field(
self.current_yaml, ("coverage", "precision"), 2
)
db_session = upload.get_db_session()
session = processing_result.session

if processing_result.error is None:
upload.state_id = UploadState.PROCESSED.db_id
upload.state = "processed"
upload.order_number = session.id
upload_totals = upload.totals
if upload_totals is None:
upload_totals = UploadLevelTotals(
upload_id=upload.id,
branches=0,
coverage=0,
hits=0,
lines=0,
methods=0,
misses=0,
partials=0,
files=0,
)
db_session.add(upload_totals)
if session.totals is not None:
upload_totals.update_from_totals(
session.totals, precision=precision, rounding=rounding
)

# delete all the carryforwarded `Upload` records corresponding to `Session`s
# which have been removed from the report.
# we always have a `session_adjustment` in the non-error case.
assert processing_result.session_adjustment
deleted_sessions = set(
processing_result.session_adjustment.fully_deleted_sessions
)
if deleted_sessions:
delete_uploads_by_sessionid(
db_session, upload.report_id, deleted_sessions
)

else:
error = processing_result.error
upload.state = "error"
upload.state_id = UploadState.ERROR.db_id
error_obj = UploadError(
upload_id=upload.id,
error_code=error.code,
error_params=error.params,
)
db_session.add(error_obj)
db_session.flush()

@sentry_sdk.trace
def save_report(self, commit: Commit, report: Report, report_code=None):
if len(report._chunks) > 2 * len(report._files) and len(report._files) > 0:
Expand Down
47 changes: 2 additions & 45 deletions services/tests/test_report.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,16 @@
import mock
import pytest
from celery.exceptions import SoftTimeLimitExceeded
from shared.reports.enums import UploadState
from shared.reports.resources import Report, ReportFile, Session, SessionType
from shared.reports.types import ReportLine, ReportTotals
from shared.torngit.exceptions import TorngitRateLimitError
from shared.yaml import UserYaml

from database.models import CommitReport, ReportDetails, RepositoryFlag, Upload
from database.tests.factories import CommitFactory, UploadFactory
from database.tests.factories import CommitFactory
from helpers.exceptions import RepositoryWithoutValidBotError
from services.archive import ArchiveService
from services.report import (
NotReadyToBuildReportYetError,
ProcessingError,
ProcessingResult,
ReportService,
)
from services.report import NotReadyToBuildReportYetError, ReportService
from services.report import log as report_log
from services.report.raw_upload_processor import (
SessionAdjustmentResult,
Expand Down Expand Up @@ -4029,43 +4023,6 @@ def test_create_report_upload(self, dbsession):
assert first_flag.flag_name == "unittest"
assert first_flag.repository_id == commit.repoid

def test_update_upload_with_processing_result_error(self, mocker, dbsession):
upload_obj = UploadFactory.create(state="started", storage_path="url")
dbsession.add(upload_obj)
dbsession.flush()
assert len(upload_obj.errors) == 0
processing_result = ProcessingResult(
session=mocker.MagicMock(),
error=ProcessingError(code="abclkj", params={"banana": "value"}),
)
ReportService({}).update_upload_with_processing_result(
upload_obj, processing_result
)
dbsession.refresh(upload_obj)
assert upload_obj.state == "error"
assert upload_obj.state_id == UploadState.ERROR.db_id
assert len(upload_obj.errors) == 1
assert upload_obj.errors[0].error_code == "abclkj"
assert upload_obj.errors[0].error_params == {"banana": "value"}
assert upload_obj.errors[0].report_upload == upload_obj

def test_update_upload_with_processing_result_success(self, mocker, dbsession):
upload_obj = UploadFactory.create(state="started", storage_path="url")
dbsession.add(upload_obj)
dbsession.flush()
assert len(upload_obj.errors) == 0
processing_result = ProcessingResult(
session=Session(),
session_adjustment=SessionAdjustmentResult([], []),
)
ReportService({}).update_upload_with_processing_result(
upload_obj, processing_result
)
dbsession.refresh(upload_obj)
assert upload_obj.state == "processed"
assert upload_obj.state_id == UploadState.PROCESSED.db_id
assert len(upload_obj.errors) == 0

def test_shift_carryforward_report(
self, dbsession, sample_report, mocker, mock_repo_provider
):
Expand Down
45 changes: 45 additions & 0 deletions tasks/tests/unit/test_upload_finisher_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,13 @@
from shared.celery_config import timeseries_save_commit_measurements_task_name
from shared.yaml import UserYaml

from database.models.reports import CommitReport
from database.tests.factories import CommitFactory, PullFactory, RepositoryFactory
from database.tests.factories.core import UploadFactory
from helpers.checkpoint_logger import CheckpointLogger, _kwargs_key
from helpers.checkpoint_logger.flows import UploadFlow
from services.processing.merging import update_uploads
from services.processing.types import MergeResult, ProcessingResult
from tasks.upload_finisher import (
ReportService,
ShouldCallNotifyResult,
Expand Down Expand Up @@ -74,6 +78,47 @@ def test_results_arg_new():
]


def test_mark_uploads_as_failed(dbsession):
commit = CommitFactory.create()
dbsession.add(commit)
dbsession.flush()
report = CommitReport(commit_id=commit.id_)
dbsession.add(report)
dbsession.flush()
upload_1 = UploadFactory.create(report=report, state="started", storage_path="url")
upload_2 = UploadFactory.create(report=report, state="started", storage_path="url2")
dbsession.add(upload_1)
dbsession.add(upload_2)
dbsession.flush()

results: list[ProcessingResult] = [
{
"upload_id": upload_1.id,
"successful": False,
"error": {"code": "report_empty", "params": {}},
},
{
"upload_id": upload_2.id,
"successful": False,
"error": {"code": "report_expired", "params": {}},
},
]

update_uploads(dbsession, UserYaml(), [], [], MergeResult({}, set()))

assert upload_1.state == "error"
assert len(upload_1.errors) == 1
assert upload_1.errors[0].error_code == "report_empty"
assert upload_1.errors[0].error_params == {}
assert upload_1.errors[0].report_upload == upload_1

Check warning on line 113 in tasks/tests/unit/test_upload_finisher_task.py

View check run for this annotation

Codecov Notifications / codecov/patch

tasks/tests/unit/test_upload_finisher_task.py#L109-L113

Added lines #L109 - L113 were not covered by tests

assert upload_2.state == "error"
assert len(upload_2.errors) == 1
assert upload_2.errors[0].error_code == "report_expired"
assert upload_2.errors[0].error_params == {}
assert upload_2.errors[0].report_upload == upload_2

Check warning on line 119 in tasks/tests/unit/test_upload_finisher_task.py

View check run for this annotation

Codecov Notifications / codecov/patch

tasks/tests/unit/test_upload_finisher_task.py#L115-L119

Added lines #L115 - L119 were not covered by tests


class TestUploadFinisherTask(object):
@pytest.mark.django_db(databases={"default"})
def test_upload_finisher_task_call(
Expand Down
Loading

0 comments on commit 3d20406

Please sign in to comment.