Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix dlt for metadata #247

Merged
merged 3 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 45 additions & 18 deletions cognee/tasks/ingestion/ingest_data_with_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
)



async def ingest_data_with_metadata(data: Any, dataset_name: str, user: User):
destination = get_dlt_destination()

Expand All @@ -24,18 +23,36 @@ async def ingest_data_with_metadata(data: Any, dataset_name: str, user: User):
destination = destination,
)

@dlt.resource(standalone = True, merge_key = "id")
async def data_resources(data: Any, user: User):
@dlt.resource(standalone=True, merge_key="id")
async def data_resources(file_paths: str):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Correct the type annotation for file_paths in data_resources.

The file_paths parameter should be annotated as a list of strings (List[str]), not str, since it is iterated over in the function.

Apply this diff to fix the type annotation:

-from typing import Any
+from typing import Any, List

 @dlt.resource(standalone=True, merge_key="id")
-async def data_resources(file_paths: str):
+async def data_resources(file_paths: List[str]):

Committable suggestion skipped: line range outside the PR's diff.

for file_path in file_paths:
with open(file_path.replace("file://", ""), mode="rb") as file:
classified_data = ingestion.classify(file)
data_id = ingestion.identify(classified_data)
file_metadata = classified_data.get_metadata()
yield {
"id": data_id,
"name": file_metadata["name"],
"file_path": file_metadata["file_path"],
"extension": file_metadata["extension"],
"mime_type": file_metadata["mime_type"],
}

async def data_storing(data: Any, dataset_name: str, user: User):
if not isinstance(data, list):
# Convert data to a list as we work with lists further down.
data = [data]

file_paths = []

# Process data
for data_item in data:
file_path = await save_data_item_with_metadata_to_storage(
data_item, dataset_name
)

file_paths.append(file_path)

# Ingest data and add metadata
with open(file_path.replace("file://", ""), mode = "rb") as file:
classified_data = ingestion.classify(file)
Expand Down Expand Up @@ -77,25 +94,35 @@ async def data_resources(data: Any, user: User):
await session.commit()
await write_metadata(data_item, data_point.id, file_metadata)


yield {
"id": data_id,
"name": file_metadata["name"],
"file_path": file_metadata["file_path"],
"extension": file_metadata["extension"],
"mime_type": file_metadata["mime_type"],
}

await give_permission_on_document(user, data_id, "read")
await give_permission_on_document(user, data_id, "write")
return file_paths

send_telemetry("cognee.add EXECUTION STARTED", user_id=user.id)
run_info = pipeline.run(
data_resources(data, user),
table_name = "file_metadata",
dataset_name = dataset_name,
write_disposition = "merge",
)

db_engine = get_relational_engine()

file_paths = await data_storing(data, dataset_name, user)

# Note: DLT pipeline has its own event loop, therefore objects created in another event loop
# can't be used inside the pipeline
if db_engine.engine.dialect.name == "sqlite":
# To use sqlite with dlt dataset_name must be set to "main".
# Sqlite doesn't support schemas
run_info = pipeline.run(
data_resources(file_paths),
table_name="file_metadata",
dataset_name="main",
write_disposition="merge",
)
else:
run_info = pipeline.run(
data_resources(file_paths),
table_name="file_metadata",
dataset_name=dataset_name,
write_disposition="merge",
)

send_telemetry("cognee.add EXECUTION COMPLETED", user_id=user.id)

return run_info
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,9 @@
async def save_data_item_with_metadata_to_storage(
data_item: Union[BinaryIO, str, Any], dataset_name: str
) -> str:
# Dynamic import is used because the llama_index module is optional.
# For the same reason Any is accepted as a data item
# Check if data is of type Document or any of it's subclasses
if str(type(data_item)).startswith("llama_index"):

if "llama_index" in str(type(data_item)):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve type checking for data_item to avoid using str(type(data_item)).

Using str(type(data_item)) for type checking is not robust and can lead to maintenance issues. Consider using isinstance, duck typing, or checking for specific attributes.

Since llama_index is an optional dependency, you can perform type checking as follows:

try:
    from llama_index import LlamaIndexType  # Replace with the actual type
except ImportError:
    LlamaIndexType = None

if LlamaIndexType and isinstance(data_item, LlamaIndexType):
    # Process data_item

Alternatively, use attribute checks:

if hasattr(data_item, 'some_unique_attribute'):
    # Process data_item

This approach is more reliable and easier to maintain.

# Dynamic import is used because the llama_index module is optional.
from .transform_data import get_data_from_llama_index

file_path = get_data_from_llama_index(data_item, dataset_name)
Expand Down
Loading