Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: AppBio Quantstudio - support skipping wells that have no results in the raw data file, indicating an omitted well #689

Merged
merged 15 commits into from
Oct 8, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,9 @@ def iter_comparative_ct_calc_docs(
calc_docs: list[CalculatedDocument | None] = []
for sample, target in view_st_data.iter_keys():
for well_item in view_st_data.get_leaf_item(sample, target):
if not well_item.has_result:
continue

calc_docs.append(build_quantity(view_tr_data, target, well_item))
calc_docs.append(build_amp_score(well_item))
calc_docs.append(build_cq_conf(well_item))
Expand Down Expand Up @@ -636,6 +639,9 @@ def iter_standard_curve_calc_docs(
calc_docs: list[CalculatedDocument | None] = []
for sample, target in view_st_data.iter_keys():
for well_item in view_st_data.get_leaf_item(sample, target):
if not well_item.has_result:
continue

calc_docs.append(build_quantity(view_tr_data, target, well_item))
calc_docs.append(build_amp_score(well_item))
calc_docs.append(build_cq_conf(well_item))
Expand Down Expand Up @@ -666,6 +672,9 @@ def iter_relative_standard_curve_calc_docs(
calc_docs: list[CalculatedDocument | None] = []
for sample, target in view_st_data.iter_keys():
for well_item in view_st_data.get_leaf_item(sample, target):
if not well_item.has_result:
continue

calc_docs.append(build_quantity(view_tr_data, target, well_item))
calc_docs.append(build_amp_score(well_item))
calc_docs.append(build_cq_conf(well_item))
Expand Down Expand Up @@ -699,6 +708,9 @@ def iter_presence_absence_calc_docs(
calc_docs: list[CalculatedDocument | None] = []
for sample, target in view_data.iter_keys():
for well_item in view_data.get_leaf_item(sample, target):
if not well_item.has_result:
continue

calc_docs.append(build_quantity(None, target, well_item))
calc_docs.append(build_amp_score(well_item))
calc_docs.append(build_cq_conf(well_item))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
Metadata,
ProcessedData,
)
from allotropy.exceptions import AllotropeConversionError
from allotropy.parsers.appbio_quantstudio import constants
from allotropy.parsers.appbio_quantstudio.appbio_quantstudio_structure import (
AmplificationData,
Expand Down Expand Up @@ -157,9 +156,11 @@ def _create_measurement(
header: Header,
multicomponent_data: MulticomponentData | None,
melt_curve_raw_data: MeltCurveRawData | None,
amplification_data: AmplificationData,
result: Result,
) -> Measurement:
amplification_data: AmplificationData | None,
result: Result | None,
) -> Measurement | None:
if not result:
return None
# TODO: temp workaround for cal doc result
well_item._result = result

Expand All @@ -184,12 +185,16 @@ def _create_measurement(
sample_role_type=well_item.sample_role_type,
well_location_identifier=well_item.well_location_identifier,
well_plate_identifier=header.barcode,
total_cycle_number_setting=amplification_data.total_cycle_number_setting,
total_cycle_number_setting=amplification_data.total_cycle_number_setting
if amplification_data
else None,
pcr_detection_chemistry=header.pcr_detection_chemistry,
reporter_dye_setting=well_item.reporter_dye_setting,
quencher_dye_setting=well_item.quencher_dye_setting,
passive_reference_dye_setting=header.passive_reference_dye_setting,
processed_data=_create_processed_data(amplification_data, result),
processed_data=_create_processed_data(amplification_data, result)
if amplification_data
else None,
sample_custom_info=well_item.extra_data,
data_cubes=data_cubes,
)
Expand Down Expand Up @@ -241,27 +246,46 @@ def create_calculated_data(
def get_well_item_results(
well_item: WellItem,
results_data: dict[int, dict[str, Result]],
) -> Result:
results_data_element = results_data.get(well_item.identifier, {}).get(
) -> Result | None:
return results_data.get(well_item.identifier, {}).get(
well_item.target_dna_description.replace(" ", "")
)
if results_data_element is None:
msg = f"No result data for well item {well_item.identifier} and target DNA {well_item.target_dna_description}"
raise AllotropeConversionError(msg)
return results_data_element


def get_well_item_amp_data(
well_item: WellItem,
amp_data: dict[int, dict[str, AmplificationData]],
) -> AmplificationData:
amp_data_element = amp_data.get(well_item.identifier, {}).get(
well_item.target_dna_description
) -> AmplificationData | None:
return amp_data.get(well_item.identifier, {}).get(well_item.target_dna_description)


def _create_measurement_group(
header: Header,
well: Well,
amp_data: dict[int, dict[str, AmplificationData]],
multi_data: dict[int, MulticomponentData],
results_data: dict[int, dict[str, Result]],
melt_data: dict[int, MeltCurveRawData],
) -> MeasurementGroup | None:
measurements = [
_create_measurement(
well_item,
header,
multi_data.get(well.identifier),
melt_data.get(well.identifier),
get_well_item_amp_data(well_item, amp_data),
get_well_item_results(well_item, results_data),
)
for well_item in well.items
if get_well_item_results(well_item, results_data)
]
group = MeasurementGroup(
analyst=header.analyst,
experimental_data_identifier=header.experimental_data_identifier,
plate_well_count=try_int_or_nan(header.plate_well_count),
measurements=[m for m in measurements if m is not None],
)
if amp_data_element is None:
msg = f"No amplification data for well item {well_item.identifier} and target DNA {well_item.target_dna_description}"
raise AllotropeConversionError(msg)
return amp_data_element
return group if group.measurements else None


def create_measurement_groups(
Expand All @@ -272,22 +296,10 @@ def create_measurement_groups(
results_data: dict[int, dict[str, Result]],
melt_data: dict[int, MeltCurveRawData],
) -> list[MeasurementGroup]:
return [
MeasurementGroup(
analyst=header.analyst,
experimental_data_identifier=header.experimental_data_identifier,
plate_well_count=try_int_or_nan(header.plate_well_count),
measurements=[
_create_measurement(
well_item,
header,
multi_data.get(well.identifier),
melt_data.get(well.identifier),
get_well_item_amp_data(well_item, amp_data),
get_well_item_results(well_item, results_data),
)
for well_item in well.items
],
groups = [
_create_measurement_group(
header, well, amp_data, multi_data, results_data, melt_data
)
for well in wells
]
return [group for group in groups if group]
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ class WellItem(Referenceable):
def __hash__(self) -> int:
return hash(self.identifier)

@property
def has_result(self) -> bool:
return self._result is not None

@property
def result(self) -> Result:
return assert_not_none(self._result)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -738,7 +738,7 @@ def iter_relative_standard_curve_calc_docs(
build_relative_rq_min(view_st_data, view_tr_data, sample, target)
)
calc_docs.append(
build_relative_rq_min(view_st_data, view_tr_data, sample, target)
build_relative_rq_max(view_st_data, view_tr_data, sample, target)
nathan-stender marked this conversation as resolved.
Show resolved Hide resolved
)

if target != r_target:
Expand Down
Loading
Loading