diff --git a/cognee/api/v1/add/add_v2.py b/cognee/api/v1/add/add_v2.py index 1ec30e67b..dbdb93a59 100644 --- a/cognee/api/v1/add/add_v2.py +++ b/cognee/api/v1/add/add_v2.py @@ -2,7 +2,7 @@ from cognee.modules.users.models import User from cognee.modules.users.methods import get_default_user from cognee.modules.pipelines import run_tasks, Task -from cognee.tasks.ingestion import ingest_data_with_metadata, resolve_data_directories +from cognee.tasks.ingestion import ingest_data, resolve_data_directories from cognee.infrastructure.databases.relational import ( create_db_and_tables as create_relational_db_and_tables, ) @@ -22,7 +22,7 @@ async def add( if user is None: user = await get_default_user() - tasks = [Task(resolve_data_directories), Task(ingest_data_with_metadata, dataset_name, user)] + tasks = [Task(resolve_data_directories), Task(ingest_data, dataset_name, user)] pipeline = run_tasks(tasks, data, "add_pipeline") diff --git a/cognee/api/v1/cognify/code_graph_pipeline.py b/cognee/api/v1/cognify/code_graph_pipeline.py index 2d077f39b..dc2af0cd5 100644 --- a/cognee/api/v1/cognify/code_graph_pipeline.py +++ b/cognee/api/v1/cognify/code_graph_pipeline.py @@ -10,7 +10,7 @@ from cognee.shared.data_models import KnowledgeGraph, MonitoringTool from cognee.tasks.documents import classify_documents, extract_chunks_from_documents from cognee.tasks.graph import extract_graph_from_data -from cognee.tasks.ingestion import ingest_data_with_metadata +from cognee.tasks.ingestion import ingest_data from cognee.tasks.repo_processor import ( enrich_dependency_graph, expand_dependency_graph, @@ -68,7 +68,7 @@ async def run_code_graph_pipeline(repo_path, include_docs=True): if include_docs: non_code_tasks = [ Task(get_non_py_files, task_config={"batch_size": 50}), - Task(ingest_data_with_metadata, dataset_name="repo_docs", user=user), + Task(ingest_data, dataset_name="repo_docs", user=user), Task(get_data_list_for_user, dataset_name="repo_docs", user=user), Task(classify_documents), Task(extract_chunks_from_documents, max_tokens=cognee_config.max_tokens), diff --git a/cognee/infrastructure/databases/vector/embeddings/LiteLLMEmbeddingEngine.py b/cognee/infrastructure/databases/vector/embeddings/LiteLLMEmbeddingEngine.py index 27061ce45..f0a40ca36 100644 --- a/cognee/infrastructure/databases/vector/embeddings/LiteLLMEmbeddingEngine.py +++ b/cognee/infrastructure/databases/vector/embeddings/LiteLLMEmbeddingEngine.py @@ -95,7 +95,7 @@ async def exponential_backoff(attempt): return await self.embed_text(text) - except (litellm.exceptions.BadRequestError, litellm.llms.OpenAI.openai.OpenAIError): + except litellm.exceptions.BadRequestError: raise EmbeddingException("Failed to index data points.") except Exception as error: diff --git a/cognee/modules/chunking/TextChunker.py b/cognee/modules/chunking/TextChunker.py index 78c02b9c9..cd71dd128 100644 --- a/cognee/modules/chunking/TextChunker.py +++ b/cognee/modules/chunking/TextChunker.py @@ -54,7 +54,6 @@ def read(self): contains=[], _metadata={ "index_fields": ["text"], - "metadata_id": self.document.metadata_id, }, ) paragraph_chunks = [] @@ -74,7 +73,6 @@ def read(self): contains=[], _metadata={ "index_fields": ["text"], - "metadata_id": self.document.metadata_id, }, ) except Exception as e: @@ -95,7 +93,7 @@ def read(self): chunk_index=self.chunk_index, cut_type=paragraph_chunks[len(paragraph_chunks) - 1]["cut_type"], contains=[], - _metadata={"index_fields": ["text"], "metadata_id": self.document.metadata_id}, + _metadata={"index_fields": ["text"]}, ) except Exception as e: print(e) diff --git a/cognee/modules/data/models/Data.py b/cognee/modules/data/models/Data.py index cf8918db7..0c0d60d0d 100644 --- a/cognee/modules/data/models/Data.py +++ b/cognee/modules/data/models/Data.py @@ -1,13 +1,11 @@ from datetime import datetime, timezone -from typing import List from uuid import uuid4 -from sqlalchemy import UUID, Column, DateTime, String -from sqlalchemy.orm import Mapped, relationship +from sqlalchemy import UUID, Column, DateTime, String, JSON +from sqlalchemy.orm import relationship from cognee.infrastructure.databases.relational import Base from .DatasetData import DatasetData -from .Metadata import Metadata class Data(Base): @@ -21,6 +19,7 @@ class Data(Base): raw_data_location = Column(String) owner_id = Column(UUID, index=True) content_hash = Column(String) + external_metadata = Column(JSON) created_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc)) updated_at = Column(DateTime(timezone=True), onupdate=lambda: datetime.now(timezone.utc)) @@ -32,13 +31,6 @@ class Data(Base): cascade="all, delete", ) - metadata_relationship = relationship( - "Metadata", - back_populates="data", - lazy="noload", - cascade="all, delete", - ) - def to_json(self) -> dict: return { "id": str(self.id), diff --git a/cognee/modules/data/models/Metadata.py b/cognee/modules/data/models/Metadata.py deleted file mode 100644 index ab41d94be..000000000 --- a/cognee/modules/data/models/Metadata.py +++ /dev/null @@ -1,21 +0,0 @@ -from datetime import datetime, timezone -from uuid import uuid4 - -from sqlalchemy import UUID, Column, DateTime, String, ForeignKey -from sqlalchemy.orm import relationship - -from cognee.infrastructure.databases.relational import Base - - -class Metadata(Base): - __tablename__ = "metadata_table" - - id = Column(UUID, primary_key=True, default=uuid4) - metadata_repr = Column(String) - metadata_source = Column(String) - - created_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc)) - updated_at = Column(DateTime(timezone=True), onupdate=lambda: datetime.now(timezone.utc)) - - data_id = Column(UUID, ForeignKey("data.id", ondelete="CASCADE"), primary_key=False) - data = relationship("Data", back_populates="metadata_relationship") diff --git a/cognee/modules/data/operations/delete_metadata.py b/cognee/modules/data/operations/delete_metadata.py deleted file mode 100644 index df94f52ed..000000000 --- a/cognee/modules/data/operations/delete_metadata.py +++ /dev/null @@ -1,19 +0,0 @@ -import warnings -from uuid import UUID - -from sqlalchemy import select - -from cognee.infrastructure.databases.relational import get_relational_engine - -from ..models.Metadata import Metadata - - -async def delete_metadata(metadata_id: UUID): - db_engine = get_relational_engine() - async with db_engine.get_async_session() as session: - metadata = await session.get(Metadata, metadata_id) - if metadata is None: - warnings.warn(f"metadata for metadata_id: {metadata_id} not found") - - session.delete(metadata) - session.commit() diff --git a/cognee/modules/data/operations/get_metadata.py b/cognee/modules/data/operations/get_metadata.py deleted file mode 100644 index f827c47c3..000000000 --- a/cognee/modules/data/operations/get_metadata.py +++ /dev/null @@ -1,17 +0,0 @@ -import json -from uuid import UUID - -from sqlalchemy import select - -from cognee.infrastructure.databases.relational import get_relational_engine - -from ..models.Metadata import Metadata - - -async def get_metadata(metadata_id: UUID) -> Metadata: - db_engine = get_relational_engine() - - async with db_engine.get_async_session() as session: - metadata = await session.get(Metadata, metadata_id) - - return metadata diff --git a/cognee/modules/data/operations/write_metadata.py b/cognee/modules/data/operations/write_metadata.py deleted file mode 100644 index 3c2c839c2..000000000 --- a/cognee/modules/data/operations/write_metadata.py +++ /dev/null @@ -1,65 +0,0 @@ -import inspect -import json -import re -import warnings -from uuid import UUID -from sqlalchemy import select -from typing import Any, BinaryIO, Union - -from cognee.infrastructure.databases.relational import get_relational_engine -from cognee.infrastructure.files.utils.get_file_metadata import FileMetadata -from ..models.Metadata import Metadata - - -async def write_metadata( - data_item: Union[BinaryIO, str, Any], data_id: UUID, file_metadata: FileMetadata -) -> UUID: - metadata_dict = get_metadata_dict(data_item, file_metadata) - db_engine = get_relational_engine() - async with db_engine.get_async_session() as session: - metadata = ( - await session.execute(select(Metadata).filter(Metadata.data_id == data_id)) - ).scalar_one_or_none() - - if metadata is not None: - metadata.metadata_repr = json.dumps(metadata_dict) - metadata.metadata_source = parse_type(type(data_item)) - await session.merge(metadata) - else: - metadata = Metadata( - id=data_id, - metadata_repr=json.dumps(metadata_dict), - metadata_source=parse_type(type(data_item)), - data_id=data_id, - ) - session.add(metadata) - - await session.commit() - - -def parse_type(type_: Any) -> str: - pattern = r".+'([\w_\.]+)'" - match = re.search(pattern, str(type_)) - if match: - return match.group(1) - else: - raise Exception(f"type: {type_} could not be parsed") - - -def get_metadata_dict( - data_item: Union[BinaryIO, str, Any], file_metadata: FileMetadata -) -> dict[str, Any]: - if isinstance(data_item, str): - return file_metadata - elif isinstance(data_item, BinaryIO): - return file_metadata - elif hasattr(data_item, "dict") and inspect.ismethod(getattr(data_item, "dict")): - return {**file_metadata, **data_item.dict()} - else: - warnings.warn( - f"metadata of type {type(data_item)}: {str(data_item)[:20]}... does not have dict method. Defaulting to string method" - ) - try: - return {**dict(file_metadata), "content": str(data_item)} - except Exception as e: - raise Exception(f"Could not cast metadata to string: {e}") diff --git a/cognee/modules/data/processing/document_types/Document.py b/cognee/modules/data/processing/document_types/Document.py index 7ecdf289e..4d9f3bf72 100644 --- a/cognee/modules/data/processing/document_types/Document.py +++ b/cognee/modules/data/processing/document_types/Document.py @@ -7,7 +7,7 @@ class Document(DataPoint): name: str raw_data_location: str - metadata_id: UUID + external_metadata: Optional[str] mime_type: str _metadata: dict = {"index_fields": ["name"], "type": "Document"} diff --git a/cognee/tasks/documents/classify_documents.py b/cognee/tasks/documents/classify_documents.py index 5c84afc7e..7211ebced 100644 --- a/cognee/tasks/documents/classify_documents.py +++ b/cognee/tasks/documents/classify_documents.py @@ -1,4 +1,5 @@ from cognee.modules.data.models import Data +import json from cognee.modules.data.processing.document_types import ( Document, PdfDocument, @@ -7,7 +8,6 @@ TextDocument, UnstructuredDocument, ) -from cognee.modules.data.operations.get_metadata import get_metadata EXTENSION_TO_DOCUMENT_CLASS = { "pdf": PdfDocument, # Text documents @@ -59,14 +59,13 @@ async def classify_documents(data_documents: list[Data]) -> list[Document]: """ documents = [] for data_item in data_documents: - metadata = await get_metadata(data_item.id) document = EXTENSION_TO_DOCUMENT_CLASS[data_item.extension]( id=data_item.id, title=f"{data_item.name}.{data_item.extension}", raw_data_location=data_item.raw_data_location, name=data_item.name, mime_type=data_item.mime_type, - metadata_id=metadata.id, + external_metadata=json.dumps(data_item.external_metadata, indent=4), ) documents.append(document) diff --git a/cognee/tasks/ingestion/__init__.py b/cognee/tasks/ingestion/__init__.py index 8b873b273..9c7180be2 100644 --- a/cognee/tasks/ingestion/__init__.py +++ b/cognee/tasks/ingestion/__init__.py @@ -1,6 +1,3 @@ -from .ingest_data import ingest_data -from .save_data_to_storage import save_data_to_storage from .save_data_item_to_storage import save_data_item_to_storage -from .save_data_item_with_metadata_to_storage import save_data_item_with_metadata_to_storage -from .ingest_data_with_metadata import ingest_data_with_metadata +from .ingest_data import ingest_data from .resolve_data_directories import resolve_data_directories diff --git a/cognee/tasks/ingestion/ingest_data.py b/cognee/tasks/ingestion/ingest_data.py index cf7dd38ad..924ef10b0 100644 --- a/cognee/tasks/ingestion/ingest_data.py +++ b/cognee/tasks/ingestion/ingest_data.py @@ -1,16 +1,24 @@ +from typing import Any, List + import dlt import cognee.modules.ingestion as ingestion - -from uuid import UUID -from cognee.shared.utils import send_telemetry -from cognee.modules.users.models import User from cognee.infrastructure.databases.relational import get_relational_engine from cognee.modules.data.methods import create_dataset +from cognee.modules.data.models.DatasetData import DatasetData +from cognee.modules.users.models import User from cognee.modules.users.permissions.methods import give_permission_on_document +from cognee.shared.utils import send_telemetry from .get_dlt_destination import get_dlt_destination +from .save_data_item_to_storage import ( + save_data_item_to_storage, +) +from typing import Union, BinaryIO +import inspect +import warnings -async def ingest_data(file_paths: list[str], dataset_name: str, user: User): + +async def ingest_data(data: Any, dataset_name: str, user: User): destination = get_dlt_destination() pipeline = dlt.pipeline( @@ -18,12 +26,21 @@ async def ingest_data(file_paths: list[str], dataset_name: str, user: User): destination=destination, ) - @dlt.resource(standalone=True, merge_key="id") - async def data_resources(file_paths: str): + def get_external_metadata_dict(data_item: Union[BinaryIO, str, Any]) -> dict[str, Any]: + if hasattr(data_item, "dict") and inspect.ismethod(getattr(data_item, "dict")): + return {"metadata": data_item.dict(), "origin": str(type(data_item))} + else: + warnings.warn( + f"Data of type {type(data_item)}... does not have dict method. Returning empty metadata." + ) + return {} + + @dlt.resource(standalone=True, primary_key="id", merge_key="id") + async def data_resources(file_paths: List[str], user: User): for file_path in file_paths: with open(file_path.replace("file://", ""), mode="rb") as file: classified_data = ingestion.classify(file) - data_id = ingestion.identify(classified_data) + data_id = ingestion.identify(classified_data, user) file_metadata = classified_data.get_metadata() yield { "id": data_id, @@ -31,71 +48,111 @@ async def data_resources(file_paths: str): "file_path": file_metadata["file_path"], "extension": file_metadata["extension"], "mime_type": file_metadata["mime_type"], + "content_hash": file_metadata["content_hash"], + "owner_id": str(user.id), } - async def data_storing(table_name, dataset_name, user: User): - db_engine = get_relational_engine() + async def data_storing(data: Any, dataset_name: str, user: User): + if not isinstance(data, list): + # Convert data to a list as we work with lists further down. + data = [data] - async with db_engine.get_async_session() as session: - # Read metadata stored with dlt - files_metadata = await db_engine.get_all_data_from_table(table_name, dataset_name) - for file_metadata in files_metadata: - from sqlalchemy import select - from cognee.modules.data.models import Data + file_paths = [] - dataset = await create_dataset(dataset_name, user.id, session) + # Process data + for data_item in data: + file_path = await save_data_item_to_storage(data_item, dataset_name) - data = ( - await session.execute(select(Data).filter(Data.id == UUID(file_metadata["id"]))) - ).scalar_one_or_none() + file_paths.append(file_path) - if data is not None: - data.name = file_metadata["name"] - data.raw_data_location = file_metadata["file_path"] - data.extension = file_metadata["extension"] - data.mime_type = file_metadata["mime_type"] + # Ingest data and add metadata + with open(file_path.replace("file://", ""), mode="rb") as file: + classified_data = ingestion.classify(file) + + # data_id is the hash of file contents + owner id to avoid duplicate data + data_id = ingestion.identify(classified_data, user) + + file_metadata = classified_data.get_metadata() + + from sqlalchemy import select + + from cognee.modules.data.models import Data + + db_engine = get_relational_engine() + + async with db_engine.get_async_session() as session: + dataset = await create_dataset(dataset_name, user.id, session) + + # Check to see if data should be updated + data_point = ( + await session.execute(select(Data).filter(Data.id == data_id)) + ).scalar_one_or_none() + + if data_point is not None: + data_point.name = file_metadata["name"] + data_point.raw_data_location = file_metadata["file_path"] + data_point.extension = file_metadata["extension"] + data_point.mime_type = file_metadata["mime_type"] + data_point.owner_id = user.id + data_point.content_hash = file_metadata["content_hash"] + data_point.external_metadata = (get_external_metadata_dict(data_item),) + await session.merge(data_point) + else: + data_point = Data( + id=data_id, + name=file_metadata["name"], + raw_data_location=file_metadata["file_path"], + extension=file_metadata["extension"], + mime_type=file_metadata["mime_type"], + owner_id=user.id, + content_hash=file_metadata["content_hash"], + external_metadata=get_external_metadata_dict(data_item), + ) + + # Check if data is already in dataset + dataset_data = ( + await session.execute( + select(DatasetData).filter( + DatasetData.data_id == data_id, DatasetData.dataset_id == dataset.id + ) + ) + ).scalar_one_or_none() + # If data is not present in dataset add it + if dataset_data is None: + dataset.data.append(data_point) - await session.merge(data) - await session.commit() - else: - data = Data( - id=UUID(file_metadata["id"]), - name=file_metadata["name"], - raw_data_location=file_metadata["file_path"], - extension=file_metadata["extension"], - mime_type=file_metadata["mime_type"], - ) - - dataset.data.append(data) await session.commit() - await give_permission_on_document(user, UUID(file_metadata["id"]), "read") - await give_permission_on_document(user, UUID(file_metadata["id"]), "write") + await give_permission_on_document(user, data_id, "read") + await give_permission_on_document(user, data_id, "write") + return file_paths send_telemetry("cognee.add EXECUTION STARTED", user_id=user.id) db_engine = get_relational_engine() + file_paths = await data_storing(data, dataset_name, user) + # Note: DLT pipeline has its own event loop, therefore objects created in another event loop # can't be used inside the pipeline if db_engine.engine.dialect.name == "sqlite": # To use sqlite with dlt dataset_name must be set to "main". # Sqlite doesn't support schemas run_info = pipeline.run( - data_resources(file_paths), + data_resources(file_paths, user), table_name="file_metadata", dataset_name="main", write_disposition="merge", ) else: + # Data should be stored in the same schema to allow deduplication run_info = pipeline.run( - data_resources(file_paths), + data_resources(file_paths, user), table_name="file_metadata", - dataset_name=dataset_name, + dataset_name="public", write_disposition="merge", ) - await data_storing("file_metadata", dataset_name, user) send_telemetry("cognee.add EXECUTION COMPLETED", user_id=user.id) return run_info diff --git a/cognee/tasks/ingestion/ingest_data_with_metadata.py b/cognee/tasks/ingestion/ingest_data_with_metadata.py deleted file mode 100644 index 04396485c..000000000 --- a/cognee/tasks/ingestion/ingest_data_with_metadata.py +++ /dev/null @@ -1,145 +0,0 @@ -from typing import Any, List - -import dlt -import cognee.modules.ingestion as ingestion -from cognee.infrastructure.databases.relational import get_relational_engine -from cognee.modules.data.methods import create_dataset -from cognee.modules.data.models.DatasetData import DatasetData -from cognee.modules.users.models import User -from cognee.modules.users.permissions.methods import give_permission_on_document -from cognee.shared.utils import send_telemetry -from cognee.modules.data.operations.write_metadata import write_metadata -from .get_dlt_destination import get_dlt_destination -from .save_data_item_with_metadata_to_storage import ( - save_data_item_with_metadata_to_storage, -) - - -async def ingest_data_with_metadata(data: Any, dataset_name: str, user: User): - destination = get_dlt_destination() - - pipeline = dlt.pipeline( - pipeline_name="file_load_from_filesystem", - destination=destination, - ) - - @dlt.resource(standalone=True, primary_key="id", merge_key="id") - async def data_resources(file_paths: List[str], user: User): - for file_path in file_paths: - with open(file_path.replace("file://", ""), mode="rb") as file: - classified_data = ingestion.classify(file) - data_id = ingestion.identify(classified_data, user) - file_metadata = classified_data.get_metadata() - yield { - "id": data_id, - "name": file_metadata["name"], - "file_path": file_metadata["file_path"], - "extension": file_metadata["extension"], - "mime_type": file_metadata["mime_type"], - "content_hash": file_metadata["content_hash"], - "owner_id": str(user.id), - } - - async def data_storing(data: Any, dataset_name: str, user: User): - if not isinstance(data, list): - # Convert data to a list as we work with lists further down. - data = [data] - - file_paths = [] - - # Process data - for data_item in data: - file_path = await save_data_item_with_metadata_to_storage(data_item, dataset_name) - - file_paths.append(file_path) - - # Ingest data and add metadata - with open(file_path.replace("file://", ""), mode="rb") as file: - classified_data = ingestion.classify(file) - - # data_id is the hash of file contents + owner id to avoid duplicate data - data_id = ingestion.identify(classified_data, user) - - file_metadata = classified_data.get_metadata() - - from sqlalchemy import select - - from cognee.modules.data.models import Data - - db_engine = get_relational_engine() - - async with db_engine.get_async_session() as session: - dataset = await create_dataset(dataset_name, user.id, session) - - # Check to see if data should be updated - data_point = ( - await session.execute(select(Data).filter(Data.id == data_id)) - ).scalar_one_or_none() - - if data_point is not None: - data_point.name = file_metadata["name"] - data_point.raw_data_location = file_metadata["file_path"] - data_point.extension = file_metadata["extension"] - data_point.mime_type = file_metadata["mime_type"] - data_point.owner_id = user.id - data_point.content_hash = file_metadata["content_hash"] - await session.merge(data_point) - else: - data_point = Data( - id=data_id, - name=file_metadata["name"], - raw_data_location=file_metadata["file_path"], - extension=file_metadata["extension"], - mime_type=file_metadata["mime_type"], - owner_id=user.id, - content_hash=file_metadata["content_hash"], - ) - - # Check if data is already in dataset - dataset_data = ( - await session.execute( - select(DatasetData).filter( - DatasetData.data_id == data_id, DatasetData.dataset_id == dataset.id - ) - ) - ).scalar_one_or_none() - # If data is not present in dataset add it - if dataset_data is None: - dataset.data.append(data_point) - - await session.commit() - await write_metadata(data_item, data_point.id, file_metadata) - - await give_permission_on_document(user, data_id, "read") - await give_permission_on_document(user, data_id, "write") - return file_paths - - send_telemetry("cognee.add EXECUTION STARTED", user_id=user.id) - - db_engine = get_relational_engine() - - file_paths = await data_storing(data, dataset_name, user) - - # Note: DLT pipeline has its own event loop, therefore objects created in another event loop - # can't be used inside the pipeline - if db_engine.engine.dialect.name == "sqlite": - # To use sqlite with dlt dataset_name must be set to "main". - # Sqlite doesn't support schemas - run_info = pipeline.run( - data_resources(file_paths, user), - table_name="file_metadata", - dataset_name="main", - write_disposition="merge", - ) - else: - # Data should be stored in the same schema to allow deduplication - run_info = pipeline.run( - data_resources(file_paths, user), - table_name="file_metadata", - dataset_name="public", - write_disposition="merge", - ) - - send_telemetry("cognee.add EXECUTION COMPLETED", user_id=user.id) - - return run_info diff --git a/cognee/tasks/ingestion/save_data_item_to_storage.py b/cognee/tasks/ingestion/save_data_item_to_storage.py index 9191f7ebc..3f9d572c9 100644 --- a/cognee/tasks/ingestion/save_data_item_to_storage.py +++ b/cognee/tasks/ingestion/save_data_item_to_storage.py @@ -1,12 +1,18 @@ -from typing import Union, BinaryIO +from typing import Union, BinaryIO, Any from cognee.modules.ingestion.exceptions import IngestionError from cognee.modules.ingestion import save_data_to_file -def save_data_item_to_storage(data_item: Union[BinaryIO, str], dataset_name: str) -> str: +async def save_data_item_to_storage(data_item: Union[BinaryIO, str, Any], dataset_name: str) -> str: + if "llama_index" in str(type(data_item)): + # Dynamic import is used because the llama_index module is optional. + from .transform_data import get_data_from_llama_index + + file_path = get_data_from_llama_index(data_item, dataset_name) + # data is a file object coming from upload. - if hasattr(data_item, "file"): + elif hasattr(data_item, "file"): file_path = save_data_to_file(data_item.file, filename=data_item.filename) elif isinstance(data_item, str): diff --git a/cognee/tasks/ingestion/save_data_item_with_metadata_to_storage.py b/cognee/tasks/ingestion/save_data_item_with_metadata_to_storage.py deleted file mode 100644 index 92697abb7..000000000 --- a/cognee/tasks/ingestion/save_data_item_with_metadata_to_storage.py +++ /dev/null @@ -1,30 +0,0 @@ -from typing import Union, BinaryIO, Any - -from cognee.modules.ingestion.exceptions import IngestionError -from cognee.modules.ingestion import save_data_to_file - - -async def save_data_item_with_metadata_to_storage( - data_item: Union[BinaryIO, str, Any], dataset_name: str -) -> str: - if "llama_index" in str(type(data_item)): - # Dynamic import is used because the llama_index module is optional. - from .transform_data import get_data_from_llama_index - - file_path = get_data_from_llama_index(data_item, dataset_name) - - # data is a file object coming from upload. - elif hasattr(data_item, "file"): - file_path = save_data_to_file(data_item.file, filename=data_item.filename) - - elif isinstance(data_item, str): - # data is a file path - if data_item.startswith("file://") or data_item.startswith("/"): - file_path = data_item.replace("file://", "") - # data is text - else: - file_path = save_data_to_file(data_item) - else: - raise IngestionError(message=f"Data type not supported: {type(data_item)}") - - return file_path diff --git a/cognee/tasks/ingestion/save_data_to_storage.py b/cognee/tasks/ingestion/save_data_to_storage.py deleted file mode 100644 index a56857261..000000000 --- a/cognee/tasks/ingestion/save_data_to_storage.py +++ /dev/null @@ -1,16 +0,0 @@ -from typing import Union, BinaryIO -from cognee.tasks.ingestion.save_data_item_to_storage import save_data_item_to_storage - - -def save_data_to_storage(data: Union[BinaryIO, str], dataset_name) -> list[str]: - if not isinstance(data, list): - # Convert data to a list as we work with lists further down. - data = [data] - - file_paths = [] - - for data_item in data: - file_path = save_data_item_to_storage(data_item, dataset_name) - file_paths.append(file_path) - - return file_paths diff --git a/cognee/tests/integration/documents/AudioDocument_test.py b/cognee/tests/integration/documents/AudioDocument_test.py index e07a2431b..9719d90fc 100644 --- a/cognee/tests/integration/documents/AudioDocument_test.py +++ b/cognee/tests/integration/documents/AudioDocument_test.py @@ -29,7 +29,7 @@ def test_AudioDocument(): id=uuid.uuid4(), name="audio-dummy-test", raw_data_location="", - metadata_id=uuid.uuid4(), + external_metadata="", mime_type="", ) with patch.object(AudioDocument, "create_transcript", return_value=TEST_TEXT): diff --git a/cognee/tests/integration/documents/ImageDocument_test.py b/cognee/tests/integration/documents/ImageDocument_test.py index b8d585419..bd15961ee 100644 --- a/cognee/tests/integration/documents/ImageDocument_test.py +++ b/cognee/tests/integration/documents/ImageDocument_test.py @@ -18,7 +18,7 @@ def test_ImageDocument(): id=uuid.uuid4(), name="image-dummy-test", raw_data_location="", - metadata_id=uuid.uuid4(), + external_metadata="", mime_type="", ) with patch.object(ImageDocument, "transcribe_image", return_value=TEST_TEXT): diff --git a/cognee/tests/integration/documents/PdfDocument_test.py b/cognee/tests/integration/documents/PdfDocument_test.py index fc4307846..82d304b6c 100644 --- a/cognee/tests/integration/documents/PdfDocument_test.py +++ b/cognee/tests/integration/documents/PdfDocument_test.py @@ -20,7 +20,7 @@ def test_PdfDocument(): id=uuid.uuid4(), name="Test document.pdf", raw_data_location=test_file_path, - metadata_id=uuid.uuid4(), + external_metadata="", mime_type="", ) diff --git a/cognee/tests/integration/documents/TextDocument_test.py b/cognee/tests/integration/documents/TextDocument_test.py index 6daec62b7..17db39be8 100644 --- a/cognee/tests/integration/documents/TextDocument_test.py +++ b/cognee/tests/integration/documents/TextDocument_test.py @@ -32,7 +32,7 @@ def test_TextDocument(input_file, chunk_size): id=uuid.uuid4(), name=input_file, raw_data_location=test_file_path, - metadata_id=uuid.uuid4(), + external_metadata="", mime_type="", ) diff --git a/cognee/tests/integration/documents/UnstructuredDocument_test.py b/cognee/tests/integration/documents/UnstructuredDocument_test.py index 773dc2293..81e804f07 100644 --- a/cognee/tests/integration/documents/UnstructuredDocument_test.py +++ b/cognee/tests/integration/documents/UnstructuredDocument_test.py @@ -39,7 +39,7 @@ def test_UnstructuredDocument(): id=uuid.uuid4(), name="example.pptx", raw_data_location=pptx_file_path, - metadata_id=uuid.uuid4(), + external_metadata="", mime_type="application/vnd.openxmlformats-officedocument.presentationml.presentation", ) @@ -47,7 +47,7 @@ def test_UnstructuredDocument(): id=uuid.uuid4(), name="example.docx", raw_data_location=docx_file_path, - metadata_id=uuid.uuid4(), + external_metadata="", mime_type="application/vnd.openxmlformats-officedocument.wordprocessingml.document", ) @@ -55,7 +55,7 @@ def test_UnstructuredDocument(): id=uuid.uuid4(), name="example.csv", raw_data_location=csv_file_path, - metadata_id=uuid.uuid4(), + external_metadata="", mime_type="text/csv", ) @@ -63,7 +63,7 @@ def test_UnstructuredDocument(): id=uuid.uuid4(), name="example.xlsx", raw_data_location=xlsx_file_path, - metadata_id=uuid.uuid4(), + external_metadata="", mime_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", ) diff --git a/notebooks/cognee_llama_index.ipynb b/notebooks/cognee_llama_index.ipynb index c7c28af5f..82db4d4ae 100644 --- a/notebooks/cognee_llama_index.ipynb +++ b/notebooks/cognee_llama_index.ipynb @@ -118,10 +118,10 @@ ] }, { - "cell_type": "code", - "execution_count": null, "metadata": {}, + "cell_type": "code", "outputs": [], + "execution_count": null, "source": [ "from typing import Union, BinaryIO\n", "\n", @@ -133,7 +133,7 @@ ")\n", "from cognee.modules.users.models import User\n", "from cognee.modules.users.methods import get_default_user\n", - "from cognee.tasks.ingestion.ingest_data_with_metadata import ingest_data_with_metadata\n", + "from cognee.tasks.ingestion.ingest_data import ingest_data\n", "import cognee\n", "\n", "# Create a clean slate for cognee -- reset data and system state\n", @@ -153,7 +153,7 @@ " if user is None:\n", " user = await get_default_user()\n", "\n", - " await ingest_data_with_metadata(data, dataset_name, user)\n", + " await ingest_data(data, dataset_name, user)\n", "\n", "\n", "await add(documents)\n",