Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add TypedEntitySchema #18

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions cmem_plugin_base/dataintegration/typed_entities/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
"""Custom entity schema that holds entities of a specific type (e.g. files)"""


def type_uri(suffix: str) -> str:
"""Create a new entity schema type URI."""
return "https://vocab.eccenca.com/di/entity/type/" + suffix


def path_uri(suffix: str) -> str:
"""Create a new entity schema path."""
return "https://vocab.eccenca.com/di/entity/path/" + suffix


def instance_uri(suffix: str) -> str:
"""Create a new typed entity instance URI"""
return "https://eccenca.com/di/entity/" + suffix
64 changes: 64 additions & 0 deletions cmem_plugin_base/dataintegration/typed_entities/file.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
"""File entities"""

from cmem_plugin_base.dataintegration.entity import Entity, EntityPath
from cmem_plugin_base.dataintegration.typed_entities import instance_uri, path_uri, type_uri
from cmem_plugin_base.dataintegration.typed_entities.typed_entities import (
TypedEntitySchema,
)


class File:
"""A file entity that can be held in a FileEntitySchema."""

def __init__(self, path: str, file_type: str, mime: str | None) -> None:
self.path = path
self.file_type = file_type
self.mime = mime


class LocalFile(File):
"""A file that's located on the local file system."""

def __init__(self, path: str, mime: str | None = None) -> None:
super().__init__(path, "Local", mime)


class ProjectFile(File):
"""A project file"""

def __init__(self, path: str, mime: str | None = None) -> None:
super().__init__(path, "Project", mime)


class FileEntitySchema(TypedEntitySchema[File]):
"""Entity schema that holds a collection of files."""

def __init__(self):
super().__init__(
type_uri=type_uri("File"),
paths=[
EntityPath(path_uri("filePath")),
EntityPath(path_uri("fileType")),
EntityPath(path_uri("mimeType")),
],
)

def to_entity(self, value: File) -> Entity:
"""Create a generic entity from a file"""
return Entity(
uri = instance_uri(value.path),
values = [[value.path], [value.file_type], [value.mime or ""]]
)

def from_entity(self, entity: Entity) -> File:
"""Create a file entity from a generic entity."""
path = entity.values[0][0]
file_type = entity.values[1][0]
mime = entity.values[2][0] if entity.values[2][0] else None
match file_type:
case "Local":
return LocalFile(path, mime)
case "Project":
return ProjectFile(path, mime)
case _:
raise ValueError(f"File '{path}' has unexpected type '{file_type}'.")
54 changes: 54 additions & 0 deletions cmem_plugin_base/dataintegration/typed_entities/typed_entities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
"""Custom entity schema that holds entities of a specific type (e.g. files)"""

from abc import abstractmethod
from collections.abc import Iterator, Sequence
from typing import Generic, TypeVar

from cmem_plugin_base.dataintegration.entity import Entities, Entity, EntityPath, EntitySchema

T = TypeVar("T")


class TypedEntitySchema(EntitySchema, Generic[T]):
"""A custom entity schema that holds entities of a specific type (e.g. files)."""

def __init__(self, type_uri: str, paths: Sequence[EntityPath]):
super().__init__(type_uri, paths)

@abstractmethod
def to_entity(self, value: T) -> Entity:
"""Create a generic entity from a typed entity."""

@abstractmethod
def from_entity(self, entity: Entity) -> T:
"""Create a typed entity from a generic entity.

Implementations may assume that the incoming schema matches the schema expected by
this typed schema, i.e., schema validation is not required.
"""

def to_entities(self, values: Iterator[T]) -> "TypedEntities[T]":
"""Given a collection of values, create a new typed entities instance."""
return TypedEntities(values, self)

def from_entities(self, entities: Entities) -> "TypedEntities[T]":
"""Create typed entities from generic entities.

Returns None if the entities do not match the target type.
"""
# TODO(robert): add validation
# CMEM-6095
if entities.schema.type_uri == self.type_uri:
if isinstance(entities, TypedEntities):
return entities
return TypedEntities(map(self.from_entity, entities.entities), self)
raise ValueError(
f"Expected entities of type '{self.type_uri}' but got '{entities.schema.type_uri}'.")

class TypedEntities(Entities, Generic[T]):
"""Collection of entities of a particular type."""

def __init__(self, values: Iterator[T], schema: TypedEntitySchema[T]):
super().__init__(map(schema.to_entity, values), schema)
self.values = values
self.schema = schema
1 change: 1 addition & 0 deletions tests/typed_entities/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""tests"""
66 changes: 66 additions & 0 deletions tests/typed_entities/test_typed_entities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
"""Test for the typed entities feature."""

import tempfile
import unittest
from collections.abc import Sequence
from pathlib import Path

from cmem_plugin_base.dataintegration.context import ExecutionContext
from cmem_plugin_base.dataintegration.entity import Entities
from cmem_plugin_base.dataintegration.plugins import WorkflowPlugin
from cmem_plugin_base.dataintegration.ports import FixedNumberOfInputs, FixedSchemaPort
from cmem_plugin_base.dataintegration.typed_entities.file import FileEntitySchema, LocalFile


class ConcatFilesOperator(WorkflowPlugin):
"""Test operator that concatenates input files."""

def __init__(self):
self.input_ports = FixedNumberOfInputs([FixedSchemaPort(FileEntitySchema())])
self.output_port = FixedSchemaPort(FileEntitySchema())

def execute(self, inputs: Sequence[Entities], context: ExecutionContext) -> Entities | None:
"""Concatenate input files"""
input_files = FileEntitySchema().from_entities(inputs[0])

with tempfile.NamedTemporaryFile(mode="wb", delete=False, delete_on_close=True) as o_file:
output_name = o_file.name
for file in input_files.values:
if isinstance(file, LocalFile):
with Path(file.path).open("rb") as f:
contents = f.read()
o_file.write(contents)

return FileEntitySchema().to_entities(iter([LocalFile(output_name)]))


class TypedEntitiesTest(unittest.TestCase):
"""Test for the typed entities feature."""

def test_files(self) -> None:
"""Test file entity schema."""
# Create two files
temp1 = tempfile.NamedTemporaryFile(delete=False, delete_on_close=True, mode="w")
temp1.write("ABC")
temp1.close()
temp2 = tempfile.NamedTemporaryFile(delete=False, delete_on_close=True, mode="w")
temp2.write("123")
temp2.close()

# Execute operator
input_entities = FileEntitySchema().to_entities(
iter([LocalFile(temp1.name), LocalFile(temp2.name)])
)
output = ConcatFilesOperator().execute([input_entities], ExecutionContext())

# Check output
assert output is not None
output_entities = list(output.entities)
assert len(output_entities) == 1
with Path(FileEntitySchema().from_entity(output_entities[0]).path).open() as output_file:
output_str = output_file.read()
assert output_str == "ABC123"


if __name__ == "__main__":
unittest.main()
Loading