From 60ba7207c98a03bc9133cd34af9632c2eecf0552 Mon Sep 17 00:00:00 2001 From: Robert Isele Date: Mon, 21 Oct 2024 16:28:26 +0200 Subject: [PATCH 1/5] Add TypedEntitySchema and first prototype of file entities --- .../typed_entities/__init__.py | 13 +++++ .../dataintegration/typed_entities/file.py | 58 +++++++++++++++++++ .../typed_entities/typed_entities.py | 52 +++++++++++++++++ tests/typed_entities/__init__.py | 1 + 4 files changed, 124 insertions(+) create mode 100644 cmem_plugin_base/dataintegration/typed_entities/__init__.py create mode 100644 cmem_plugin_base/dataintegration/typed_entities/file.py create mode 100644 cmem_plugin_base/dataintegration/typed_entities/typed_entities.py create mode 100644 tests/typed_entities/__init__.py diff --git a/cmem_plugin_base/dataintegration/typed_entities/__init__.py b/cmem_plugin_base/dataintegration/typed_entities/__init__.py new file mode 100644 index 0000000..7c79062 --- /dev/null +++ b/cmem_plugin_base/dataintegration/typed_entities/__init__.py @@ -0,0 +1,13 @@ +"""Custom entity schema that holds entities of a specific type (e.g. files)""" + +def type_uri(suffix: str) -> str: + """Create a new entity schema type URI.""" + return "https://vocab.eccenca.com/di/entity/type/" + suffix + +def path_uri(suffix: str) -> str: + """Create a new entity schema path.""" + return "https://vocab.eccenca.com/di/entity/path/" + suffix + +def instance_uri(suffix: str) -> str: + """Create a new typed entity instance URI""" + return "https://eccenca.com/di/entity/" + suffix diff --git a/cmem_plugin_base/dataintegration/typed_entities/file.py b/cmem_plugin_base/dataintegration/typed_entities/file.py new file mode 100644 index 0000000..823c43b --- /dev/null +++ b/cmem_plugin_base/dataintegration/typed_entities/file.py @@ -0,0 +1,58 @@ +"""File entities""" + +from cmem_plugin_base.dataintegration.entity import Entity, EntityPath +from cmem_plugin_base.dataintegration.typed_entities import instance_uri, path_uri, type_uri +from cmem_plugin_base.dataintegration.typed_entities.typed_entities import ( + TypedEntitySchema, +) + + +class File: + """A file entity that can be held in a FileEntitySchema.""" + + def __init__(self, path: str, file_type: str, mime: str | None) -> None: + self.path = path + self.file_type = file_type + self.mime = mime + + +class LocalFile(File): + """A file that's located on the local file system.""" + + def __init__(self, path: str, mime: str | None = None) -> None: + super().__init__(path, "Local", mime) + + +class ProjectFile(File): + """A project file""" + + def __init__(self, path: str, mime: str | None = None) -> None: + super().__init__(path, "Project", mime) + + +class FileEntitySchema(TypedEntitySchema[File]): + """Entity schema that holds a collection of files.""" + + def __init__(self): + super().__init__( + type_uri=type_uri("File"), + paths=[ + EntityPath(path_uri("filePath")), + EntityPath(path_uri("fileType")), + EntityPath(path_uri("mimeType")), + ] + ) + + def to_entity(self, value: File) -> Entity: + """Create a generic entity from a file""" + return Entity(instance_uri(value.path), [value.path, value.file_type, value.mime or ""]) + + def from_entity(self, entity: Entity) -> File: + """Create a file entity from a generic entity.""" + path = entity.values[0][0] + mime = entity.values[2][0] if entity.values[2][0] else None + match entity.values[1][0]: + case "Local": + return LocalFile(path, mime) + case "Project": + return ProjectFile(path, mime) diff --git a/cmem_plugin_base/dataintegration/typed_entities/typed_entities.py b/cmem_plugin_base/dataintegration/typed_entities/typed_entities.py new file mode 100644 index 0000000..d6f7b71 --- /dev/null +++ b/cmem_plugin_base/dataintegration/typed_entities/typed_entities.py @@ -0,0 +1,52 @@ +"""Custom entity schema that holds entities of a specific type (e.g. files)""" + +from collections.abc import Iterator, Sequence +from typing import Generic, TypeVar + +from cmem_plugin_base.dataintegration.entity import Entities, Entity, EntityPath, EntitySchema + +T = TypeVar("T") + +class TypedEntitySchema(EntitySchema, Generic[T]): + """A custom entity schema that holds entities of a specific type (e.g. files).""" + + def __init__(self, type_uri: str, paths: Sequence[EntityPath]): + super().__init__(type_uri, paths) + + def to_entity(self, value: T) -> Entity: + """Create a generic entity from a typed entity.""" + + def from_entity(self, entity: Entity) -> T: + """Create a typed entity from a generic entity. + + Implementations may assume that the incoming schema matches the schema expected by + this typed schema, i.e., schema validation is not required. + """ + + def to_entities(self, values: Iterator[T]) -> "TypedEntities[T]": + """Given a collection of values, create a new typed entities instance.""" + return TypedEntities(values, self) + + def from_entities(self, entities: Entities) -> "TypedEntities[T] | None": + """Create typed entities from generic entities. + + Returns None if the entities do not match the target type. + """ + # TODO(robert): add validation + # CMEM-6095 + if entities.schema.type_uri == self.type_uri: + if isinstance(entities, TypedEntities): + return entities + return TypedEntities(map(self.from_entity, entities.entities), self) + return None + + +class TypedEntities(Entities, Generic[T]): + """Collection of entities of a particular type.""" + + def __init__(self, values: Iterator[T], schema: TypedEntitySchema[T]): + super().__init__(map(schema.to_entity, values), schema) + self.values = values + self.schema = schema + + diff --git a/tests/typed_entities/__init__.py b/tests/typed_entities/__init__.py new file mode 100644 index 0000000..30ea453 --- /dev/null +++ b/tests/typed_entities/__init__.py @@ -0,0 +1 @@ +"""tests""" From d262d602e7ac3b57d71ee31021b419d02379508a Mon Sep 17 00:00:00 2001 From: Robert Isele Date: Mon, 21 Oct 2024 16:37:16 +0200 Subject: [PATCH 2/5] Fixed mypy issues --- cmem_plugin_base/dataintegration/typed_entities/file.py | 5 ++++- .../dataintegration/typed_entities/typed_entities.py | 4 +++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/cmem_plugin_base/dataintegration/typed_entities/file.py b/cmem_plugin_base/dataintegration/typed_entities/file.py index 823c43b..0fcc2d3 100644 --- a/cmem_plugin_base/dataintegration/typed_entities/file.py +++ b/cmem_plugin_base/dataintegration/typed_entities/file.py @@ -50,9 +50,12 @@ def to_entity(self, value: File) -> Entity: def from_entity(self, entity: Entity) -> File: """Create a file entity from a generic entity.""" path = entity.values[0][0] + file_type = entity.values[1][0] mime = entity.values[2][0] if entity.values[2][0] else None - match entity.values[1][0]: + match file_type: case "Local": return LocalFile(path, mime) case "Project": return ProjectFile(path, mime) + case _: + raise ValueError(f"File '{path}' has unexpected type '{file_type}'.") diff --git a/cmem_plugin_base/dataintegration/typed_entities/typed_entities.py b/cmem_plugin_base/dataintegration/typed_entities/typed_entities.py index d6f7b71..93f70ce 100644 --- a/cmem_plugin_base/dataintegration/typed_entities/typed_entities.py +++ b/cmem_plugin_base/dataintegration/typed_entities/typed_entities.py @@ -1,5 +1,5 @@ """Custom entity schema that holds entities of a specific type (e.g. files)""" - +from abc import abstractmethod from collections.abc import Iterator, Sequence from typing import Generic, TypeVar @@ -13,9 +13,11 @@ class TypedEntitySchema(EntitySchema, Generic[T]): def __init__(self, type_uri: str, paths: Sequence[EntityPath]): super().__init__(type_uri, paths) + @abstractmethod def to_entity(self, value: T) -> Entity: """Create a generic entity from a typed entity.""" + @abstractmethod def from_entity(self, entity: Entity) -> T: """Create a typed entity from a generic entity. From a327d18b1c145e6a661a3b9c7f1af153974bbb8a Mon Sep 17 00:00:00 2001 From: Robert Isele Date: Mon, 21 Oct 2024 16:40:48 +0200 Subject: [PATCH 3/5] Format new files --- cmem_plugin_base/dataintegration/typed_entities/__init__.py | 3 +++ cmem_plugin_base/dataintegration/typed_entities/file.py | 4 ++-- .../dataintegration/typed_entities/typed_entities.py | 4 ++-- 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/cmem_plugin_base/dataintegration/typed_entities/__init__.py b/cmem_plugin_base/dataintegration/typed_entities/__init__.py index 7c79062..187e545 100644 --- a/cmem_plugin_base/dataintegration/typed_entities/__init__.py +++ b/cmem_plugin_base/dataintegration/typed_entities/__init__.py @@ -1,13 +1,16 @@ """Custom entity schema that holds entities of a specific type (e.g. files)""" + def type_uri(suffix: str) -> str: """Create a new entity schema type URI.""" return "https://vocab.eccenca.com/di/entity/type/" + suffix + def path_uri(suffix: str) -> str: """Create a new entity schema path.""" return "https://vocab.eccenca.com/di/entity/path/" + suffix + def instance_uri(suffix: str) -> str: """Create a new typed entity instance URI""" return "https://eccenca.com/di/entity/" + suffix diff --git a/cmem_plugin_base/dataintegration/typed_entities/file.py b/cmem_plugin_base/dataintegration/typed_entities/file.py index 0fcc2d3..3998951 100644 --- a/cmem_plugin_base/dataintegration/typed_entities/file.py +++ b/cmem_plugin_base/dataintegration/typed_entities/file.py @@ -40,7 +40,7 @@ def __init__(self): EntityPath(path_uri("filePath")), EntityPath(path_uri("fileType")), EntityPath(path_uri("mimeType")), - ] + ], ) def to_entity(self, value: File) -> Entity: @@ -51,7 +51,7 @@ def from_entity(self, entity: Entity) -> File: """Create a file entity from a generic entity.""" path = entity.values[0][0] file_type = entity.values[1][0] - mime = entity.values[2][0] if entity.values[2][0] else None + mime = entity.values[2][0] if entity.values[2][0] else None match file_type: case "Local": return LocalFile(path, mime) diff --git a/cmem_plugin_base/dataintegration/typed_entities/typed_entities.py b/cmem_plugin_base/dataintegration/typed_entities/typed_entities.py index 93f70ce..8514d70 100644 --- a/cmem_plugin_base/dataintegration/typed_entities/typed_entities.py +++ b/cmem_plugin_base/dataintegration/typed_entities/typed_entities.py @@ -1,4 +1,5 @@ """Custom entity schema that holds entities of a specific type (e.g. files)""" + from abc import abstractmethod from collections.abc import Iterator, Sequence from typing import Generic, TypeVar @@ -7,6 +8,7 @@ T = TypeVar("T") + class TypedEntitySchema(EntitySchema, Generic[T]): """A custom entity schema that holds entities of a specific type (e.g. files).""" @@ -50,5 +52,3 @@ def __init__(self, values: Iterator[T], schema: TypedEntitySchema[T]): super().__init__(map(schema.to_entity, values), schema) self.values = values self.schema = schema - - From a98daec6165f9774448ed45be1ab69893878b636 Mon Sep 17 00:00:00 2001 From: Robert Isele Date: Wed, 23 Oct 2024 13:30:19 +0200 Subject: [PATCH 4/5] Added test for TypedEntities --- .../dataintegration/typed_entities/file.py | 5 +- .../typed_entities/typed_entities.py | 6 +- tests/typed_entities/test_typed_entities.py | 67 +++++++++++++++++++ 3 files changed, 74 insertions(+), 4 deletions(-) create mode 100644 tests/typed_entities/test_typed_entities.py diff --git a/cmem_plugin_base/dataintegration/typed_entities/file.py b/cmem_plugin_base/dataintegration/typed_entities/file.py index 3998951..7f7ef1b 100644 --- a/cmem_plugin_base/dataintegration/typed_entities/file.py +++ b/cmem_plugin_base/dataintegration/typed_entities/file.py @@ -45,7 +45,10 @@ def __init__(self): def to_entity(self, value: File) -> Entity: """Create a generic entity from a file""" - return Entity(instance_uri(value.path), [value.path, value.file_type, value.mime or ""]) + return Entity( + uri = instance_uri(value.path), + values = [[value.path], [value.file_type], [value.mime or ""]] + ) def from_entity(self, entity: Entity) -> File: """Create a file entity from a generic entity.""" diff --git a/cmem_plugin_base/dataintegration/typed_entities/typed_entities.py b/cmem_plugin_base/dataintegration/typed_entities/typed_entities.py index 8514d70..0b2e9fd 100644 --- a/cmem_plugin_base/dataintegration/typed_entities/typed_entities.py +++ b/cmem_plugin_base/dataintegration/typed_entities/typed_entities.py @@ -31,7 +31,7 @@ def to_entities(self, values: Iterator[T]) -> "TypedEntities[T]": """Given a collection of values, create a new typed entities instance.""" return TypedEntities(values, self) - def from_entities(self, entities: Entities) -> "TypedEntities[T] | None": + def from_entities(self, entities: Entities) -> "TypedEntities[T]": """Create typed entities from generic entities. Returns None if the entities do not match the target type. @@ -42,8 +42,8 @@ def from_entities(self, entities: Entities) -> "TypedEntities[T] | None": if isinstance(entities, TypedEntities): return entities return TypedEntities(map(self.from_entity, entities.entities), self) - return None - + raise ValueError( + f"Expected entities of type '{self.type_uri}' but got '{entities.schema.type_uri}'.") class TypedEntities(Entities, Generic[T]): """Collection of entities of a particular type.""" diff --git a/tests/typed_entities/test_typed_entities.py b/tests/typed_entities/test_typed_entities.py new file mode 100644 index 0000000..629c485 --- /dev/null +++ b/tests/typed_entities/test_typed_entities.py @@ -0,0 +1,67 @@ +"""Test for the typed entities feature.""" + +import tempfile +import unittest +from collections.abc import Sequence +from pathlib import Path + +from cmem_plugin_base.dataintegration.context import ExecutionContext +from cmem_plugin_base.dataintegration.entity import Entities +from cmem_plugin_base.dataintegration.plugins import WorkflowPlugin +from cmem_plugin_base.dataintegration.ports import FixedNumberOfInputs, FixedSchemaPort +from cmem_plugin_base.dataintegration.typed_entities.file import FileEntitySchema, LocalFile + + +class ConcatFilesOperator(WorkflowPlugin): + """Test operator that concatenates input files.""" + + def __init__(self): + self.input_ports = FixedNumberOfInputs([FixedSchemaPort(FileEntitySchema())]) + self.output_port = FixedSchemaPort(FileEntitySchema()) + + def execute(self, inputs: Sequence[Entities], context: ExecutionContext) -> Entities | None: + """Concatenate input files""" + input_files = FileEntitySchema().from_entities(inputs[0]) + input_files + + with tempfile.NamedTemporaryFile(mode="wb", delete=False, delete_on_close=True) as o_file: + output_name = o_file.name + for file in input_files.values: + if isinstance(file, LocalFile): + with Path(file.path).open("rb") as f: + contents = f.read() + o_file.write(contents) + + return FileEntitySchema().to_entities(iter([LocalFile(output_name)])) + + +class TypedEntitiesTest(unittest.TestCase): + """Test for the typed entities feature.""" + + def test_files(self) -> None: + """Test file entity schema.""" + # Create two files + temp1 = tempfile.NamedTemporaryFile(delete=False, delete_on_close=True, mode="w") + temp1.write("ABC") + temp1.close() + temp2 = tempfile.NamedTemporaryFile(delete=False, delete_on_close=True, mode="w") + temp2.write("123") + temp2.close() + + # Execute operator + input_entities = FileEntitySchema().to_entities( + iter([LocalFile(temp1.name), LocalFile(temp2.name)]) + ) + output = ConcatFilesOperator().execute([input_entities], ExecutionContext()) + + # Check output + assert output is not None + output_entities = list(output.entities) + assert len(output_entities) == 1 + with Path(FileEntitySchema().from_entity(output_entities[0]).path).open() as output_file: + output_str = output_file.read() + assert output_str == "ABC123" + + +if __name__ == "__main__": + unittest.main() From c91f6efb1c6454eb5bd4f172b3671c5e6a2d8503 Mon Sep 17 00:00:00 2001 From: Robert Isele Date: Wed, 23 Oct 2024 13:30:40 +0200 Subject: [PATCH 5/5] Minor fix --- tests/typed_entities/test_typed_entities.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/typed_entities/test_typed_entities.py b/tests/typed_entities/test_typed_entities.py index 629c485..46fb9ef 100644 --- a/tests/typed_entities/test_typed_entities.py +++ b/tests/typed_entities/test_typed_entities.py @@ -22,7 +22,6 @@ def __init__(self): def execute(self, inputs: Sequence[Entities], context: ExecutionContext) -> Entities | None: """Concatenate input files""" input_files = FileEntitySchema().from_entities(inputs[0]) - input_files with tempfile.NamedTemporaryFile(mode="wb", delete=False, delete_on_close=True) as o_file: output_name = o_file.name