diff --git a/cmem_plugin_base/dataintegration/typed_entities/__init__.py b/cmem_plugin_base/dataintegration/typed_entities/__init__.py new file mode 100644 index 0000000..187e545 --- /dev/null +++ b/cmem_plugin_base/dataintegration/typed_entities/__init__.py @@ -0,0 +1,16 @@ +"""Custom entity schema that holds entities of a specific type (e.g. files)""" + + +def type_uri(suffix: str) -> str: + """Create a new entity schema type URI.""" + return "https://vocab.eccenca.com/di/entity/type/" + suffix + + +def path_uri(suffix: str) -> str: + """Create a new entity schema path.""" + return "https://vocab.eccenca.com/di/entity/path/" + suffix + + +def instance_uri(suffix: str) -> str: + """Create a new typed entity instance URI""" + return "https://eccenca.com/di/entity/" + suffix diff --git a/cmem_plugin_base/dataintegration/typed_entities/file.py b/cmem_plugin_base/dataintegration/typed_entities/file.py new file mode 100644 index 0000000..7f7ef1b --- /dev/null +++ b/cmem_plugin_base/dataintegration/typed_entities/file.py @@ -0,0 +1,64 @@ +"""File entities""" + +from cmem_plugin_base.dataintegration.entity import Entity, EntityPath +from cmem_plugin_base.dataintegration.typed_entities import instance_uri, path_uri, type_uri +from cmem_plugin_base.dataintegration.typed_entities.typed_entities import ( + TypedEntitySchema, +) + + +class File: + """A file entity that can be held in a FileEntitySchema.""" + + def __init__(self, path: str, file_type: str, mime: str | None) -> None: + self.path = path + self.file_type = file_type + self.mime = mime + + +class LocalFile(File): + """A file that's located on the local file system.""" + + def __init__(self, path: str, mime: str | None = None) -> None: + super().__init__(path, "Local", mime) + + +class ProjectFile(File): + """A project file""" + + def __init__(self, path: str, mime: str | None = None) -> None: + super().__init__(path, "Project", mime) + + +class FileEntitySchema(TypedEntitySchema[File]): + """Entity schema that holds a collection of files.""" + + def __init__(self): + super().__init__( + type_uri=type_uri("File"), + paths=[ + EntityPath(path_uri("filePath")), + EntityPath(path_uri("fileType")), + EntityPath(path_uri("mimeType")), + ], + ) + + def to_entity(self, value: File) -> Entity: + """Create a generic entity from a file""" + return Entity( + uri = instance_uri(value.path), + values = [[value.path], [value.file_type], [value.mime or ""]] + ) + + def from_entity(self, entity: Entity) -> File: + """Create a file entity from a generic entity.""" + path = entity.values[0][0] + file_type = entity.values[1][0] + mime = entity.values[2][0] if entity.values[2][0] else None + match file_type: + case "Local": + return LocalFile(path, mime) + case "Project": + return ProjectFile(path, mime) + case _: + raise ValueError(f"File '{path}' has unexpected type '{file_type}'.") diff --git a/cmem_plugin_base/dataintegration/typed_entities/typed_entities.py b/cmem_plugin_base/dataintegration/typed_entities/typed_entities.py new file mode 100644 index 0000000..0b2e9fd --- /dev/null +++ b/cmem_plugin_base/dataintegration/typed_entities/typed_entities.py @@ -0,0 +1,54 @@ +"""Custom entity schema that holds entities of a specific type (e.g. files)""" + +from abc import abstractmethod +from collections.abc import Iterator, Sequence +from typing import Generic, TypeVar + +from cmem_plugin_base.dataintegration.entity import Entities, Entity, EntityPath, EntitySchema + +T = TypeVar("T") + + +class TypedEntitySchema(EntitySchema, Generic[T]): + """A custom entity schema that holds entities of a specific type (e.g. files).""" + + def __init__(self, type_uri: str, paths: Sequence[EntityPath]): + super().__init__(type_uri, paths) + + @abstractmethod + def to_entity(self, value: T) -> Entity: + """Create a generic entity from a typed entity.""" + + @abstractmethod + def from_entity(self, entity: Entity) -> T: + """Create a typed entity from a generic entity. + + Implementations may assume that the incoming schema matches the schema expected by + this typed schema, i.e., schema validation is not required. + """ + + def to_entities(self, values: Iterator[T]) -> "TypedEntities[T]": + """Given a collection of values, create a new typed entities instance.""" + return TypedEntities(values, self) + + def from_entities(self, entities: Entities) -> "TypedEntities[T]": + """Create typed entities from generic entities. + + Returns None if the entities do not match the target type. + """ + # TODO(robert): add validation + # CMEM-6095 + if entities.schema.type_uri == self.type_uri: + if isinstance(entities, TypedEntities): + return entities + return TypedEntities(map(self.from_entity, entities.entities), self) + raise ValueError( + f"Expected entities of type '{self.type_uri}' but got '{entities.schema.type_uri}'.") + +class TypedEntities(Entities, Generic[T]): + """Collection of entities of a particular type.""" + + def __init__(self, values: Iterator[T], schema: TypedEntitySchema[T]): + super().__init__(map(schema.to_entity, values), schema) + self.values = values + self.schema = schema diff --git a/tests/typed_entities/__init__.py b/tests/typed_entities/__init__.py new file mode 100644 index 0000000..30ea453 --- /dev/null +++ b/tests/typed_entities/__init__.py @@ -0,0 +1 @@ +"""tests""" diff --git a/tests/typed_entities/test_typed_entities.py b/tests/typed_entities/test_typed_entities.py new file mode 100644 index 0000000..46fb9ef --- /dev/null +++ b/tests/typed_entities/test_typed_entities.py @@ -0,0 +1,66 @@ +"""Test for the typed entities feature.""" + +import tempfile +import unittest +from collections.abc import Sequence +from pathlib import Path + +from cmem_plugin_base.dataintegration.context import ExecutionContext +from cmem_plugin_base.dataintegration.entity import Entities +from cmem_plugin_base.dataintegration.plugins import WorkflowPlugin +from cmem_plugin_base.dataintegration.ports import FixedNumberOfInputs, FixedSchemaPort +from cmem_plugin_base.dataintegration.typed_entities.file import FileEntitySchema, LocalFile + + +class ConcatFilesOperator(WorkflowPlugin): + """Test operator that concatenates input files.""" + + def __init__(self): + self.input_ports = FixedNumberOfInputs([FixedSchemaPort(FileEntitySchema())]) + self.output_port = FixedSchemaPort(FileEntitySchema()) + + def execute(self, inputs: Sequence[Entities], context: ExecutionContext) -> Entities | None: + """Concatenate input files""" + input_files = FileEntitySchema().from_entities(inputs[0]) + + with tempfile.NamedTemporaryFile(mode="wb", delete=False, delete_on_close=True) as o_file: + output_name = o_file.name + for file in input_files.values: + if isinstance(file, LocalFile): + with Path(file.path).open("rb") as f: + contents = f.read() + o_file.write(contents) + + return FileEntitySchema().to_entities(iter([LocalFile(output_name)])) + + +class TypedEntitiesTest(unittest.TestCase): + """Test for the typed entities feature.""" + + def test_files(self) -> None: + """Test file entity schema.""" + # Create two files + temp1 = tempfile.NamedTemporaryFile(delete=False, delete_on_close=True, mode="w") + temp1.write("ABC") + temp1.close() + temp2 = tempfile.NamedTemporaryFile(delete=False, delete_on_close=True, mode="w") + temp2.write("123") + temp2.close() + + # Execute operator + input_entities = FileEntitySchema().to_entities( + iter([LocalFile(temp1.name), LocalFile(temp2.name)]) + ) + output = ConcatFilesOperator().execute([input_entities], ExecutionContext()) + + # Check output + assert output is not None + output_entities = list(output.entities) + assert len(output_entities) == 1 + with Path(FileEntitySchema().from_entity(output_entities[0]).path).open() as output_file: + output_str = output_file.read() + assert output_str == "ABC123" + + +if __name__ == "__main__": + unittest.main()