From 61aebf79e0ee924452185d7a25b0d75838f191ef Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Wed, 4 Dec 2024 11:14:30 +0100 Subject: [PATCH 1/2] fix: Resolve issue with dlt for ingest_data_with_metadata Resolve issue caused by dlt for ingest_data_with_metadata task Fix --- .../ingestion/ingest_data_with_metadata.py | 63 +++++++++++++------ 1 file changed, 45 insertions(+), 18 deletions(-) diff --git a/cognee/tasks/ingestion/ingest_data_with_metadata.py b/cognee/tasks/ingestion/ingest_data_with_metadata.py index abd3c9f9..b820e623 100644 --- a/cognee/tasks/ingestion/ingest_data_with_metadata.py +++ b/cognee/tasks/ingestion/ingest_data_with_metadata.py @@ -15,7 +15,6 @@ ) - async def ingest_data_with_metadata(data: Any, dataset_name: str, user: User): destination = get_dlt_destination() @@ -24,18 +23,36 @@ async def ingest_data_with_metadata(data: Any, dataset_name: str, user: User): destination = destination, ) - @dlt.resource(standalone = True, merge_key = "id") - async def data_resources(data: Any, user: User): + @dlt.resource(standalone=True, merge_key="id") + async def data_resources(file_paths: str): + for file_path in file_paths: + with open(file_path.replace("file://", ""), mode="rb") as file: + classified_data = ingestion.classify(file) + data_id = ingestion.identify(classified_data) + file_metadata = classified_data.get_metadata() + yield { + "id": data_id, + "name": file_metadata["name"], + "file_path": file_metadata["file_path"], + "extension": file_metadata["extension"], + "mime_type": file_metadata["mime_type"], + } + + async def data_storing(data: Any, dataset_name: str, user: User): if not isinstance(data, list): # Convert data to a list as we work with lists further down. data = [data] + file_paths = [] + # Process data for data_item in data: file_path = await save_data_item_with_metadata_to_storage( data_item, dataset_name ) + file_paths.append(file_path) + # Ingest data and add metadata with open(file_path.replace("file://", ""), mode = "rb") as file: classified_data = ingestion.classify(file) @@ -77,25 +94,35 @@ async def data_resources(data: Any, user: User): await session.commit() await write_metadata(data_item, data_point.id, file_metadata) - - yield { - "id": data_id, - "name": file_metadata["name"], - "file_path": file_metadata["file_path"], - "extension": file_metadata["extension"], - "mime_type": file_metadata["mime_type"], - } - await give_permission_on_document(user, data_id, "read") await give_permission_on_document(user, data_id, "write") + return file_paths send_telemetry("cognee.add EXECUTION STARTED", user_id=user.id) - run_info = pipeline.run( - data_resources(data, user), - table_name = "file_metadata", - dataset_name = dataset_name, - write_disposition = "merge", - ) + + db_engine = get_relational_engine() + + file_paths = await data_storing(data, dataset_name, user) + + # Note: DLT pipeline has its own event loop, therefore objects created in another event loop + # can't be used inside the pipeline + if db_engine.engine.dialect.name == "sqlite": + # To use sqlite with dlt dataset_name must be set to "main". + # Sqlite doesn't support schemas + run_info = pipeline.run( + data_resources(file_paths), + table_name="file_metadata", + dataset_name="main", + write_disposition="merge", + ) + else: + run_info = pipeline.run( + data_resources(file_paths), + table_name="file_metadata", + dataset_name=dataset_name, + write_disposition="merge", + ) + send_telemetry("cognee.add EXECUTION COMPLETED", user_id=user.id) return run_info From ceebcdb2517d6da4915dbad3e696598a27495cc7 Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Wed, 4 Dec 2024 11:29:27 +0100 Subject: [PATCH 2/2] fix: Resolve issue with llama index type resolution Resolve issue with llama index type resolution Fix --- .../ingestion/save_data_item_with_metadata_to_storage.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/cognee/tasks/ingestion/save_data_item_with_metadata_to_storage.py b/cognee/tasks/ingestion/save_data_item_with_metadata_to_storage.py index ef1ce4ec..d758ebcd 100644 --- a/cognee/tasks/ingestion/save_data_item_with_metadata_to_storage.py +++ b/cognee/tasks/ingestion/save_data_item_with_metadata_to_storage.py @@ -7,10 +7,9 @@ async def save_data_item_with_metadata_to_storage( data_item: Union[BinaryIO, str, Any], dataset_name: str ) -> str: - # Dynamic import is used because the llama_index module is optional. - # For the same reason Any is accepted as a data item - # Check if data is of type Document or any of it's subclasses - if str(type(data_item)).startswith("llama_index"): + + if "llama_index" in str(type(data_item)): + # Dynamic import is used because the llama_index module is optional. from .transform_data import get_data_from_llama_index file_path = get_data_from_llama_index(data_item, dataset_name)