Skip to content

Commit

Permalink
Merge pull request #549 from malariagen/GH547_QC_summary_stats
Browse files Browse the repository at this point in the history
Add sequence QC metadata to sample_metadata()
  • Loading branch information
leehart authored Jun 17, 2024
2 parents 6d638c1 + 275f4c0 commit 54e7a45
Show file tree
Hide file tree
Showing 11 changed files with 1,785 additions and 90 deletions.
238 changes: 157 additions & 81 deletions malariagen_data/anoph/sample_metadata.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import io
from itertools import cycle
from typing import Any, Dict, List, Mapping, Optional, Sequence, Tuple, Union
from typing import Any, Callable, Dict, List, Mapping, Optional, Sequence, Tuple, Union

import ipyleaflet # type: ignore
import numpy as np
Expand Down Expand Up @@ -54,7 +54,39 @@ def __init__(
# Initialize cache attributes.
self._cache_sample_metadata: Dict = dict()

def _general_metadata_paths(self, *, sample_sets: List[str]) -> Dict[str, str]:
def _parse_metadata_paths(
self,
metadata_paths_func: Callable[[List[str]], Dict[str, str]],
parse_metadata_func: Callable[[str, Union[bytes, Exception]], pd.DataFrame],
sample_sets: Optional[base_params.sample_sets] = None,
) -> pd.DataFrame:
# Normalise input parameters.
sample_sets_prepped = self._prep_sample_sets_param(sample_sets=sample_sets)
del sample_sets

# Obtain paths for all files we need to fetch.
file_paths: Mapping[str, str] = metadata_paths_func(sample_sets_prepped)

# Fetch all files. N.B., here is an optimisation, this allows us to fetch
# multiple files concurrently.
files: Mapping[str, Union[bytes, Exception]] = self.read_files(
paths=file_paths.values(), on_error="return"
)

# Parse files into DataFrames.
dfs = []
for sample_set in sample_sets_prepped:
path = file_paths[sample_set]
data = files[path]
df = parse_metadata_func(sample_set, data)
dfs.append(df)

# Concatenate all DataFrames.
df_ret = pd.concat(dfs, axis=0, ignore_index=True)

return df_ret

def _general_metadata_paths(self, sample_sets: List[str]) -> Dict[str, str]:
paths = dict()
for sample_set in sample_sets:
release = self.lookup_release(sample_set=sample_set)
Expand All @@ -64,7 +96,7 @@ def _general_metadata_paths(self, *, sample_sets: List[str]) -> Dict[str, str]:
return paths

def _parse_general_metadata(
self, *, sample_set: str, data: Union[bytes, Exception]
self, sample_set: str, data: Union[bytes, Exception]
) -> pd.DataFrame:
if isinstance(data, bytes):
dtype = {
Expand Down Expand Up @@ -117,33 +149,105 @@ def _parse_general_metadata(
def general_metadata(
self, sample_sets: Optional[base_params.sample_sets] = None
) -> pd.DataFrame:
# Normalise input parameters.
sample_sets_prepped = self._prep_sample_sets_param(sample_sets=sample_sets)
del sample_sets

# Obtain paths for all files we need to fetch.
file_paths: Mapping[str, str] = self._general_metadata_paths(
sample_sets=sample_sets_prepped
return self._parse_metadata_paths(
metadata_paths_func=self._general_metadata_paths,
parse_metadata_func=self._parse_general_metadata,
sample_sets=sample_sets,
)

# Fetch all files. N.B., here is an optimisation, this allows us to fetch
# multiple files concurrently.
files: Mapping[str, Union[bytes, Exception]] = self.read_files(
paths=file_paths.values(), on_error="return"
def _sequence_qc_metadata_paths(self, sample_sets: List[str]) -> Dict[str, str]:
paths = dict()
for sample_set in sample_sets:
release = self.lookup_release(sample_set=sample_set)
release_path = self._release_to_path(release=release)
path = (
f"{release_path}/metadata/curation/{sample_set}/sequence_qc_stats.csv"
)
paths[sample_set] = path
return paths

@property
def _sequence_qc_metadata_dtype(self):
# Note: tests expect an ordered dictionary.
# Note: insertion order in dictionary keys is guaranteed since Python 3.7
# Note: using nullable dtypes (e.g. Int64 instead of int64) to allow missing data.

dtype = {
"sample_id": "object",
"mean_cov": "Float64",
"median_cov": "Int64",
"modal_cov": "Int64",
}

for contig in sorted(self.config["CONTIGS"]):
dtype[f"mean_cov_{contig}"] = "Float64"
dtype[f"median_cov_{contig}"] = "Int64"
dtype[f"mode_cov_{contig}"] = "Int64"

dtype.update(
{
"frac_gen_cov": "Float64",
"divergence": "Float64",
"contam_pct": "Float64",
"contam_LLR": "Float64",
}
)

# Parse files into dataframes.
dfs = []
for sample_set in sample_sets_prepped:
path = file_paths[sample_set]
data = files[path]
df = self._parse_general_metadata(sample_set=sample_set, data=data)
dfs.append(df)
return dtype

# Concatenate all dataframes.
df_ret = pd.concat(dfs, axis=0, ignore_index=True)
def _parse_sequence_qc_metadata(
self, sample_set: str, data: Union[bytes, Exception]
) -> pd.DataFrame:
if isinstance(data, bytes):
# Get the dtype of the constant columns.
dtype = self._sequence_qc_metadata_dtype

return df_ret
# Read the CSV using the dtype dict.
df = pd.read_csv(io.BytesIO(data), dtype=dtype, na_values="")

return df

elif isinstance(data, FileNotFoundError):
# Sequence QC metadata are missing for this sample set,
# so return a blank DataFrame.

# Copy the sample ids from the general metadata.
df_general = self.general_metadata(sample_sets=sample_set)
df = df_general[["sample_id"]].copy()

# Add the sequence QC columns with appropriate missing values.
# For each column, set the value to either NA or NaN.
for c, dtype in self._sequence_qc_metadata_dtype.items():
if pd.api.types.is_integer_dtype(dtype):
# Note: this creates a column with dtype int64.
df[c] = -1
else:
# Note: this creates a column with dtype float64.
df[c] = np.nan

# Set the column data types.
df = df.astype(self._sequence_qc_metadata_dtype)

return df

else:
raise data

@check_types
@doc(
summary="""
Access sequence QC metadata for one or more sample sets.
""",
returns="A pandas DataFrame, one row per sample.",
)
def sequence_qc_metadata(
self, sample_sets: Optional[base_params.sample_sets] = None
) -> pd.DataFrame:
return self._parse_metadata_paths(
metadata_paths_func=self._sequence_qc_metadata_paths,
parse_metadata_func=self._parse_sequence_qc_metadata,
sample_sets=sample_sets,
)

@property
def _cohorts_analysis(self):
Expand All @@ -154,7 +258,7 @@ def _cohorts_analysis(self):
# config.
return self.config.get("DEFAULT_COHORTS_ANALYSIS")

def _cohorts_metadata_paths(self, *, sample_sets: List[str]) -> Dict[str, str]:
def _cohorts_metadata_paths(self, sample_sets: List[str]) -> Dict[str, str]:
cohorts_analysis = self._cohorts_analysis
# Guard to ensure this function is only ever called if a cohort
# analysis is configured for this data resource.
Expand Down Expand Up @@ -212,7 +316,7 @@ def _cohorts_metadata_dtype(self):
return dtype

def _parse_cohorts_metadata(
self, *, sample_set: str, data: Union[bytes, Exception]
self, sample_set: str, data: Union[bytes, Exception]
) -> pd.DataFrame:
if isinstance(data, bytes):
# Parse CSV data.
Expand Down Expand Up @@ -265,34 +369,12 @@ def cohorts_metadata(
) -> pd.DataFrame:
self._require_cohorts_analysis()

# Normalise input parameters.
sample_sets_prepped = self._prep_sample_sets_param(sample_sets=sample_sets)
del sample_sets

# Obtain paths for all files we need to fetch.
file_paths: Mapping[str, str] = self._cohorts_metadata_paths(
sample_sets=sample_sets_prepped
)

# Fetch all files. N.B., here is an optimisation, this allows us to fetch
# multiple files concurrently.
files: Mapping[str, Union[bytes, Exception]] = self.read_files(
paths=file_paths.values(), on_error="return"
return self._parse_metadata_paths(
metadata_paths_func=self._cohorts_metadata_paths,
parse_metadata_func=self._parse_cohorts_metadata,
sample_sets=sample_sets,
)

# Parse files into dataframes.
dfs = []
for sample_set in sample_sets_prepped:
path = file_paths[sample_set]
data = files[path]
df = self._parse_cohorts_metadata(sample_set=sample_set, data=data)
dfs.append(df)

# Concatenate all dataframes.
df_ret = pd.concat(dfs, axis=0, ignore_index=True)

return df_ret

@property
def _aim_analysis(self):
if self._aim_analysis_override:
Expand All @@ -302,7 +384,7 @@ def _aim_analysis(self):
# config.
return self.config.get("DEFAULT_AIM_ANALYSIS")

def _aim_metadata_paths(self, *, sample_sets: List[str]) -> Dict[str, str]:
def _aim_metadata_paths(self, sample_sets: List[str]) -> Dict[str, str]:
aim_analysis = self._aim_analysis
# Guard to ensure this function is only ever called if an AIM
# analysis is configured for this data resource.
Expand All @@ -316,7 +398,7 @@ def _aim_metadata_paths(self, *, sample_sets: List[str]) -> Dict[str, str]:
return paths

def _parse_aim_metadata(
self, *, sample_set: str, data: Union[bytes, Exception]
self, sample_set: str, data: Union[bytes, Exception]
) -> pd.DataFrame:
assert self._aim_metadata_columns is not None
assert self._aim_metadata_dtype is not None
Expand Down Expand Up @@ -360,34 +442,12 @@ def aim_metadata(
) -> pd.DataFrame:
self._require_aim_analysis()

# Normalise input parameters.
sample_sets_prepped = self._prep_sample_sets_param(sample_sets=sample_sets)
del sample_sets

# Obtain paths for all files we need to fetch.
file_paths: Mapping[str, str] = self._aim_metadata_paths(
sample_sets=sample_sets_prepped
)

# Fetch all files. N.B., here is an optimisation, this allows us to fetch
# multiple files concurrently.
files: Mapping[str, Union[bytes, Exception]] = self.read_files(
paths=file_paths.values(), on_error="return"
return self._parse_metadata_paths(
metadata_paths_func=self._aim_metadata_paths,
parse_metadata_func=self._parse_aim_metadata,
sample_sets=sample_sets,
)

# Parse files into dataframes.
dfs = []
for sample_set in sample_sets_prepped:
path = file_paths[sample_set]
data = files[path]
df = self._parse_aim_metadata(sample_set=sample_set, data=data)
dfs.append(df)

# Concatenate all dataframes.
df_ret = pd.concat(dfs, axis=0, ignore_index=True)

return df_ret

@check_types
@doc(
summary="""
Expand Down Expand Up @@ -465,13 +525,29 @@ def sample_metadata(

except KeyError:
with self._spinner(desc="Load sample metadata"):
# Build a dataframe from all available metadata.
## Build a single DataFrame using all available metadata.

# Get the general sample metadata.
df_samples = self.general_metadata(sample_sets=prepped_sample_sets)

# Merge with the sequence QC metadata.
df_sequence_qc = self.sequence_qc_metadata(
sample_sets=prepped_sample_sets
)

# Note: merging can change column dtypes
df_samples = df_samples.merge(
df_sequence_qc, on="sample_id", sort=False, how="left"
)

# If available, merge with the AIM metadata.
if self._aim_analysis:
df_aim = self.aim_metadata(sample_sets=prepped_sample_sets)
df_samples = df_samples.merge(
df_aim, on="sample_id", sort=False, how="left"
)

# If available, merge with the cohorts metadata.
if self._cohorts_analysis:
df_cohorts = self.cohorts_metadata(sample_sets=prepped_sample_sets)
df_samples = df_samples.merge(
Expand Down
Loading

0 comments on commit 54e7a45

Please sign in to comment.