Skip to content

Commit

Permalink
Modify to be true JSONs chema with using $ref and $def for custom obj…
Browse files Browse the repository at this point in the history
…ects (image, file) #85
  • Loading branch information
donaldcampbelljr committed Oct 30, 2023
1 parent 6301b86 commit b71d387
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 197 deletions.
2 changes: 1 addition & 1 deletion pipestat/backends/db_backend/dbbackend.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def __init__(
:param dict status_schema_source: filepath of status schema
:param str result_formatter: function for formatting result
"""
_LOGGER.warning(f"Initializing DBBackend for pipeline '{pieline_name}'")
_LOGGER.warning(f"Initializing DBBackend for pipeline '{pipeline_name}'")
self.pipeline_name = pipeline_name
self.pipeline_type = pipeline_type or "sample"
self.record_identifier = record_identifier
Expand Down
206 changes: 39 additions & 167 deletions pipestat/parsed_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,28 +28,6 @@

# The columns associated with the file and image types
PATH_COL_SPEC = (Path, ...)
# TITLE_COL_SPEC = (Optional[str], Field(default=None))
# THUMBNAIL_COL_SPEC = (Optional[Path], Field(default=None))


# def _custom_types_column_specifications():
# """Collection of the column specifications for the custom types"""
# return {
# "path": PATH_COL_SPEC,
# "title": TITLE_COL_SPEC,
# "thumbnail": THUMBNAIL_COL_SPEC,
# }


# def get_base_model():
# class BaseModel(SQLModel):
# __table_args__ = {"extend_existing": True}
#
# class Config:
# arbitrary_types_allowed = True
#
# # return SQLModel
# return BaseModel


def _safe_pop_one_mapping(
Expand Down Expand Up @@ -103,6 +81,7 @@ def __init__(self, data: Union[Dict[str, Any], Path, str]) -> None:
# Currently supporting backwards compatibility with old output schema while now also supporting a JSON schema:
if "properties" in list(data.keys()):
# Assume top-level properties key implies proper JSON schema.

self._pipeline_name = data["properties"].pop(SCHEMA_PIPELINE_NAME_KEY, None)

sample_data = _safe_pop_one_mapping(
Expand All @@ -126,6 +105,10 @@ def __init__(self, data: Union[Dict[str, Any], Path, str]) -> None:
mappingkey="properties",
)

sample_data = replace_JSON_refs(sample_data, data)

prj_data = replace_JSON_refs(prj_data, data)

else:
self._pipeline_name = data.pop(SCHEMA_PIPELINE_NAME_KEY, None)
sample_data = _safe_pop_one_mapping(
Expand Down Expand Up @@ -225,38 +208,6 @@ def sample_table_name(self):
"""Return the name of the database table for sample-level information."""
return self._table_name("sample")

#
# def _make_field_definitions(self, data: Dict[str, Any], require_type: bool):
# # TODO: default to string if no type key?
# # TODO: parse "required" ?
# defs = {}
# for name, subdata in data.items():
# try:
# typename = subdata[SCHEMA_TYPE_KEY]
# except KeyError:
# if require_type:
# _LOGGER.error(f"'{SCHEMA_TYPE_KEY}' is required for each schema element")
# raise
# else:
# data_type = str
# else:
# data_type = self._get_data_type(typename)
# if data_type == CLASSES_BY_TYPE["object"] or data_type == CLASSES_BY_TYPE["array"]:
# defs[name] = (
# data_type,
# Field(sa_column=Column(JSONB), default=null()),
# )
# else:
# defs[name] = (
# # Optional[subdata[SCHEMA_TYPE_KEY]],
# # subdata[SCHEMA_TYPE_KEY],
# # Optional[str],
# # CLASSES_BY_TYPE[subdata[SCHEMA_TYPE_KEY]],
# data_type,
# Field(default=subdata.get("default")),
# )
# return defs

@staticmethod
def _get_data_type(type_name):
t = CLASSES_BY_TYPE[type_name]
Expand All @@ -267,30 +218,6 @@ def _get_data_type(type_name):
def file_like_table_name(self):
return self._table_name("files")

# def build_model(self, pipeline_type):
# if pipeline_type == "project":
# data = self.project_level_data
# # if using the same output schema and thus, pipeline name for samples and project
# # we must ensure there are distinct table names in the same database.
# table_name = self.project_table_name
#
# if pipeline_type == "sample":
# data = self.sample_level_data
# table_name = self.sample_table_name
#
# if not self.sample_level_data and not self.project_level_data:
# return None
#
# field_defs = self._make_field_definitions(data, require_type=True)
# field_defs = self._add_status_field(field_defs)
# field_defs = self._add_record_identifier_field(field_defs)
# field_defs = self._add_id_field(field_defs)
# # field_defs = self._add_project_name_field(field_defs)
# field_defs = self._add_pipeline_name_field(field_defs)
# field_defs = self._add_created_time_field(field_defs)
# field_defs = self._add_modified_time_field(field_defs)
# return _create_model(table_name, **field_defs)

def to_dict(self) -> Dict[str, Any]:
"""Create simple dictionary representation of this instance."""
data = {SCHEMA_PIPELINE_NAME_KEY: self.pipeline_name}
Expand All @@ -303,99 +230,10 @@ def to_dict(self) -> Dict[str, Any]:
data[key] = values
return data

#
# @staticmethod
# def _add_project_name_field(field_defs: Dict[str, Any]) -> Dict[str, Any]:
# if PROJECT_NAME in field_defs:
# raise SchemaError(
# f"'{PROJECT_NAME}' is reserved as identifier and can't be part of schema."
# )
# field_defs[PROJECT_NAME] = (str, Field(default=None))
#
# return field_defs
#
# @staticmethod
# def _add_pipeline_name_field(field_defs: Dict[str, Any]) -> Dict[str, Any]:
# if PIPELINE_NAME in field_defs:
# raise SchemaError(
# f"'{PIPELINE_NAME}' is reserved as identifier and can't be part of schema."
# )
# field_defs[PIPELINE_NAME] = (str, Field(default=None))
#
# return field_defs
#
# @staticmethod
# def _add_id_field(field_defs: Dict[str, Any]) -> Dict[str, Any]:
# if ID_KEY in field_defs:
# raise SchemaError(
# f"'{ID_KEY}' is reserved for primary key and can't be part of schema."
# )
# field_defs[ID_KEY] = (
# Optional[int],
# Field(default=None, primary_key=True),
# )
# return field_defs
#
# @staticmethod
# def _add_record_identifier_field(field_defs: Dict[str, Any]) -> Dict[str, Any]:
# if RECORD_IDENTIFIER in field_defs:
# raise SchemaError(
# f"'{RECORD_IDENTIFIER}' is reserved as identifier and can't be part of schema."
# )
# field_defs[RECORD_IDENTIFIER] = (str, Field(default=None))
# return field_defs
#
# @staticmethod
# def _add_sample_name_field(field_defs: Dict[str, Any]) -> Dict[str, Any]:
# if SAMPLE_NAME in field_defs:
# raise SchemaError(
# f"'{SAMPLE_NAME}' is reserved as identifier and can't be part of schema."
# )
# field_defs[SAMPLE_NAME] = (str, Field(default=None))
# return field_defs
#
# @staticmethod
# def _add_status_field(field_defs: Dict[str, Any]) -> Dict[str, Any]:
# if STATUS in field_defs:
# raise SchemaError(
# f"'{STATUS}' is reserved for status reporting and can't be part of schema."
# )
# field_defs[STATUS] = (str, Field(default=None))
# return field_defs
#
# @staticmethod
# def _add_created_time_field(field_defs: Dict[str, Any]) -> Dict[str, Any]:
# if CREATED_TIME in field_defs:
# raise SchemaError(
# f"'{CREATED_TIME}' is reserved for time reporting and can't be part of schema."
# )
# field_defs[CREATED_TIME] = (datetime.datetime, Field(default=None))
# return field_defs
#
# @staticmethod
# def _add_modified_time_field(field_defs: Dict[str, Any]) -> Dict[str, Any]:
# if MODIFIED_TIME in field_defs:
# raise SchemaError(
# f"'{MODIFIED_TIME}' is reserved for time reporting and can't be part of schema."
# )
# field_defs[MODIFIED_TIME] = (datetime.datetime, Field(default=None))
# return field_defs

def _table_name(self, suffix: str) -> str:
return f"{self.pipeline_name}__{suffix}"


#
# def _create_model(table_name: str, **kwargs):
# return create_model(
# table_name,
# __base__=get_base_model(),
# __cls_kwargs__={"table": True},
# **kwargs,
# )
#


def _recursively_replace_custom_types(s: Dict[str, Any]) -> Dict[str, Any]:
"""
Replace the custom types in pipestat schema with canonical types
Expand Down Expand Up @@ -423,3 +261,37 @@ def _recursively_replace_custom_types(s: Dict[str, Any]) -> Dict[str, Any]:
spec.setdefault("required", []).extend(curr_type_spec["required"])
spec[SCHEMA_TYPE_KEY] = curr_type_spec[SCHEMA_TYPE_KEY]
return s


def replace_JSON_refs(s: Dict[str, Any], data: Dict[str, Any]) -> Dict[str, Any]:
"""
Recursively search and replace the $refs if they exist in schema, s, and if their corresponding $defs exist in
source schema, data
:param dict s: schema to replace types in
:param dict data: source schema
:return dict s: schema with types replaced
"""

for k, v in list(s.items()):
if type(v) == dict:
replace_JSON_refs(s[k], data)
if "$ref" == k:
split_value = v.split("/")
if len(split_value) != 3:
raise SchemaError(
msg=f"$ref exists in source schema but path,{v} ,not valid, e.g. '#/$defs/file' "
)
if split_value[1] in data and split_value[2] in data[split_value[1]]:
result = data[split_value[1]][split_value[2]]
else:
result = None
if result is not None:
s.update({split_value[2]: result})
s.update({"type": "object"})
del s["$ref"]
else:
raise SchemaError(
msg=f"Could not find {split_value[1]} and {split_value[2]} in $def"
)
return s
59 changes: 32 additions & 27 deletions tests/data/output_schema_as_JSON_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,42 +9,30 @@ properties:
number_of_things:
type: integer
description: "Lorem ipsum dolor sit amet, consectetur adipiscing elit."
smooth_bw:
type: string
description: "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Fusce nec cursus nulla."
path: "aligned_{genome}/{sample_name}_smooth.bw"
output_image:
type: image
$ref: "#/$defs/image"
description: "This an output image"
properties:
path: "path string"
thumbnail_path: "path string to thumbnail"
title: "image title"
output_file:
type: file
$ref: "#/$defs/file"
description: "This a path to the output image"
properties:
path:
type: string
description: "path to string"
collection_of_images:
type: array
description: A collection of images.
items:
type: object
properties:
prop1:
type: file
$ref: "#/$defs/file"
description: An example file.
output_file_in_object:
nested_object:
type: object
description: An object containing output files.
description: An object containing output file and image.
properties:
example_property_1:
type: file
$ref: "#/$defs/file"
description: An example file.
example_property_2:
type: image
$ref: "#/$defs/image"
description: An example image.
output_file_nested_object:
type: object
Expand All @@ -55,21 +43,38 @@ properties:
description: Second Level
properties:
third_level_property_1:
type: file
$ref: "#/$defs/file"
description: Third Level
example_property_2:
type: object
description: Second Level
properties:
third_level_property_1:
type: file
$ref: "#/$defs/file"
description: Third Level
project:
$defs:
image:
type: object
object_type: image
properties:
path:
type: string
thumbnail_path:
type: string
title:
type: string
required:
- path
- thumbnail_path
- title
file:
type: object
object_type: file
properties:
project_output_file:
type: file
description: The path to the output file.
protocol:
path:
type: string
title:
type: string
description: example protocol description
required:
- path
- title
1 change: 0 additions & 1 deletion tests/test_parsed_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,4 +262,3 @@ def test_sample_project_data_item_name_overlap__raises_expected_error_and_messag
def test_JSON_schema_validation(output_schema_as_JSON_schema):
schema = ParsedSchema(output_schema_as_JSON_schema)
assert "number_of_things" in dict(schema.sample_level_data).keys()
assert "protocol" in dict(schema.project_level_data).keys()
2 changes: 1 addition & 1 deletion tests/test_pipestat.py
Original file line number Diff line number Diff line change
Expand Up @@ -1158,7 +1158,7 @@ def test_linking(
},
{
"sample2": {
"output_file_in_object": {
"nested_object": {
"example_property_1": {
"path": path_file_1,
"thumbnail_path": "path_string",
Expand Down

0 comments on commit b71d387

Please sign in to comment.