Skip to content

Commit

Permalink
add checks for duplicate variables
Browse files Browse the repository at this point in the history
  • Loading branch information
vemonet committed Oct 14, 2024
1 parent f8befd1 commit c919e40
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 76 deletions.
31 changes: 19 additions & 12 deletions backend/src/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,10 @@ def load_cohort_dict_file(dict_path: str, cohort_id: str) -> Dataset:
# Try to get IDs from old format multiple columns
df["concept_id"] = df.apply(lambda row: get_id_from_multi_columns(row), axis=1)

duplicate_variables = df[df.duplicated(subset=["VARIABLE NAME"], keep=False)]
if not duplicate_variables.empty:
errors.append(f"Duplicate VARIABLE NAME found: {', '.join(duplicate_variables['VARIABLE NAME'].unique())}")

cohort_uri = get_cohort_uri(cohort_id)
g = init_graph()
g.add((cohort_uri, RDF.type, ICARE.Cohort, cohort_uri))
Expand Down Expand Up @@ -375,24 +379,27 @@ async def upload_cohort(
)

# NOTE: waiting for more tests before sending to production
background_tasks.add_task(generate_mappings, cohort_id, metadata_path, g)
# background_tasks.add_task(generate_mappings, cohort_id, metadata_path, g)
# TODO: move all the "delete_existing_triples" and "publish_graph_to_endpoint" logic to the background task after mappings have been generated
# Return "The cohort has been successfully uploaded. The variables are being mapped to standard codes and will be available in the Cohort Explorer in a few minutes."

# # Delete previous graph for this file from triplestore
# delete_existing_triples(
# get_cohort_mapping_uri(cohort_id), f"<{get_cohort_uri(cohort_id)!s}>", "icare:previewEnabled"
# )
# delete_existing_triples(get_cohort_uri(cohort_id))
# publish_graph_to_endpoint(g)

# Delete previous graph for this file from triplestore
delete_existing_triples(
get_cohort_mapping_uri(cohort_id), f"<{get_cohort_uri(cohort_id)!s}>", "icare:previewEnabled"
)
delete_existing_triples(get_cohort_uri(cohort_id))
publish_graph_to_endpoint(g)
except Exception as e:
os.remove(metadata_path)
raise e

# return {
# "message": f"Metadata for cohort {cohort_id} have been successfully uploaded. The variables are being mapped to standard codes and will be available in the Cohort Explorer in a few minutes.",
# "identifier": cohort_id,
# # **cohort.dict(),
# }
return {
"message": f"Metadata for cohort {cohort_id} have been successfully uploaded. The variables are being mapped to standard codes and will be available in the Cohort Explorer in a few minutes.",
"message": f"Metadata for cohort {cohort_id} have been successfully uploaded.",
"identifier": cohort_id,
# **cohort.dict(),
}

def generate_mappings(cohort_id: str, metadata_path: str, g: Graph) -> None:
Expand Down Expand Up @@ -524,7 +531,7 @@ def init_triplestore() -> None:
# NOTE: default airlock preview to false if we ever need to reset cohorts,
# admins can easily ddl and reupload the cohorts with the correct airlock value
g = load_cohort_dict_file(file, folder)
g.serialize(f"{settings.data_folder}/cohort_explorer_triplestore.trig", format="trig")
# g.serialize(f"{settings.data_folder}/cohort_explorer_triplestore.trig", format="trig")
if publish_graph_to_endpoint(g):
print(f"💾 Triplestore initialization: added {len(g)} triples for cohorts {file}.")

Expand Down
131 changes: 67 additions & 64 deletions backend/src/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,70 +123,73 @@ def retrieve_cohorts_metadata(user_email: str) -> dict[str, Cohort]:
results = run_query(get_variables_query)["results"]["bindings"]
cohorts_with_variables = {}
cohorts_without_variables = {}
print(f"Get cohorts metadata query results: {len(results)}")
# print(f"Get cohorts metadata query results: {len(results)}")
for row in results:
cohort_id = str(row["cohortId"]["value"])
var_id = str(row["varName"]["value"]) if "varName" in row else None
# Determine which dictionary to use
target_dict = cohorts_with_variables if var_id else cohorts_without_variables

# Initialize cohort data structure if not exists
if cohort_id and cohort_id not in target_dict:
target_dict[cohort_id] = Cohort(
cohort_id=row["cohortId"]["value"],
cohort_type=get_value("cohortType", row),
cohort_email=[get_value("cohortEmail", row)] if get_value("cohortEmail", row) else [],
# owner=get_value("owner", row),
institution=get_value("cohortInstitution", row),
study_type=get_value("study_type", row),
study_participants=get_value("study_participants", row),
study_duration=get_value("study_duration", row),
study_ongoing=get_value("study_ongoing", row),
study_population=get_value("study_population", row),
study_objective=get_value("study_objective", row),
variables={},
airlock=get_bool_value("airlock", row),
can_edit=user_email in [*settings.admins_list, get_value("cohortEmail", row)],
)
elif get_value("cohortEmail", row) not in target_dict[cohort_id].cohort_email:
# Handle multiple emails for the same cohort
target_dict[cohort_id].cohort_email.append(get_value("cohortEmail", row))
if user_email == get_value("cohortEmail", row):
target_dict[cohort_id].can_edit = True

# Process variables
if "varName" in row and var_id not in target_dict[cohort_id].variables:
target_dict[cohort_id].variables[var_id] = CohortVariable(
var_name=row["varName"]["value"],
var_label=row["varLabel"]["value"],
var_type=row["varType"]["value"],
count=int(row["count"]["value"]),
max=get_value("max", row),
min=get_value("min", row),
units=get_value("units", row),
visits=get_value("visits", row),
formula=get_value("formula", row),
definition=get_value("definition", row),
concept_id=get_curie_value("conceptId", row),
mapped_id=get_curie_value("mappedId", row),
mapped_label=get_value("mappedLabel", row),
omop_domain=get_value("omopDomain", row),
index=get_int_value("index", row),
na=get_int_value("na", row) or 0,
)

# Process categories of variables
if "varName" in row and "categoryLabel" in row and "categoryValue" in row:
new_category = VariableCategory(
value=str(row["categoryValue"]["value"]),
label=str(row["categoryLabel"]["value"]),
concept_id=get_curie_value("categoryConceptId", row),
mapped_id=get_curie_value("categoryMappedId", row),
mapped_label=get_value("categoryMappedLabel", row),
)
# Check for duplicates before appending
if new_category not in target_dict[cohort_id].variables[var_id].categories:
target_dict[cohort_id].variables[var_id].categories.append(new_category)

try:
cohort_id = str(row["cohortId"]["value"])
var_id = str(row["varName"]["value"]) if "varName" in row else None
# Determine which dictionary to use
target_dict = cohorts_with_variables if var_id else cohorts_without_variables

# Initialize cohort data structure if not exists
if cohort_id and cohort_id not in target_dict:
target_dict[cohort_id] = Cohort(
cohort_id=row["cohortId"]["value"],
cohort_type=get_value("cohortType", row),
cohort_email=[get_value("cohortEmail", row)] if get_value("cohortEmail", row) else [],
# owner=get_value("owner", row),
institution=get_value("cohortInstitution", row),
study_type=get_value("study_type", row),
study_participants=get_value("study_participants", row),
study_duration=get_value("study_duration", row),
study_ongoing=get_value("study_ongoing", row),
study_population=get_value("study_population", row),
study_objective=get_value("study_objective", row),
variables={},
airlock=get_bool_value("airlock", row),
can_edit=user_email in [*settings.admins_list, get_value("cohortEmail", row)],
)
elif get_value("cohortEmail", row) not in target_dict[cohort_id].cohort_email:
# Handle multiple emails for the same cohort
target_dict[cohort_id].cohort_email.append(get_value("cohortEmail", row))
if user_email == get_value("cohortEmail", row):
target_dict[cohort_id].can_edit = True

# Process variables
if "varName" in row and var_id not in target_dict[cohort_id].variables:
target_dict[cohort_id].variables[var_id] = CohortVariable(
var_name=row["varName"]["value"],
var_label=row["varLabel"]["value"],
var_type=row["varType"]["value"],
count=int(row["count"]["value"]),
max=get_value("max", row),
min=get_value("min", row),
units=get_value("units", row),
visits=get_value("visits", row),
formula=get_value("formula", row),
definition=get_value("definition", row),
concept_id=get_curie_value("conceptId", row),
mapped_id=get_curie_value("mappedId", row),
mapped_label=get_value("mappedLabel", row),
omop_domain=get_value("omopDomain", row),
index=get_int_value("index", row),
na=get_int_value("na", row) or 0,
)
# raise Exception(f"OLALALA")

# Process categories of variables
if "varName" in row and "categoryLabel" in row and "categoryValue" in row:
new_category = VariableCategory(
value=str(row["categoryValue"]["value"]),
label=str(row["categoryLabel"]["value"]),
concept_id=get_curie_value("categoryConceptId", row),
mapped_id=get_curie_value("categoryMappedId", row),
mapped_label=get_value("categoryMappedLabel", row),
)
# Check for duplicates before appending
if new_category not in target_dict[cohort_id].variables[var_id].categories:
target_dict[cohort_id].variables[var_id].categories.append(new_category)
except Exception as e:
print(f"Error processing row {row}: {e}")
# Merge dictionaries, cohorts with variables first
return {**cohorts_with_variables, **cohorts_without_variables}

0 comments on commit c919e40

Please sign in to comment.