-
Notifications
You must be signed in to change notification settings - Fork 89
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Cog 793 metadata rework #460
Conversation
Have foreign metadata be a table column in data instead of it's own table to reduce complexity Refactor COG-793
WalkthroughThis pull request introduces significant changes to the data and metadata handling in the Cognee system. The modifications primarily involve removing the Changes
Possibly related PRs
Suggested reviewers
Poem
✨ Finishing Touches
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
Resolve issue with LlamaIndex notebook after refactor
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 10
🧹 Nitpick comments (5)
cognee/tasks/ingestion/ingest_data.py (2)
126-127
: Consider combining permission grants into a single callOn lines 126-127, permissions are granted separately:
await give_permission_on_document(user, data_id, "read") await give_permission_on_document(user, data_id, "write")If the
give_permission_on_document
function supports multiple permissions at once, consider combining them to reduce the number of calls:await give_permission_on_document(user, data_id, ["read", "write"])
134-135
: Variabledb_engine
is unused after assignmentOn line 134,
db_engine
is assigned but not used in the subsequent code:db_engine = get_relational_engine()If
db_engine
is not needed beyond this point, consider removing the assignment to clean up the code.- db_engine = get_relational_engine()
Alternatively, if it is needed elsewhere, ensure it's utilized appropriately.
cognee/tasks/ingestion/save_data_item_to_storage.py (1)
7-7
: Consider using a more specific type than Any.While
Any
provides flexibility, consider creating a custom type or protocol for supported data items to maintain type safety and improve code documentation.-from typing import Union, BinaryIO, Any +from typing import Union, BinaryIO, Protocol + +class DataItem(Protocol): + """Protocol for supported data items in storage.""" + def __str__(self) -> str: ... + +class LlamaIndexItem(DataItem): + """Type hint for LlamaIndex data items.""" + pass -async def save_data_item_to_storage(data_item: Union[BinaryIO, str, Any], dataset_name: str) -> str: +async def save_data_item_to_storage(data_item: Union[BinaryIO, str, LlamaIndexItem], dataset_name: str) -> str:cognee/tests/integration/documents/UnstructuredDocument_test.py (1)
42-42
: Enhance test coverage for foreign_metadata.The test currently uses empty strings for
foreign_metadata
across all document types. Consider adding test cases with:
- Non-empty foreign metadata
- Different metadata structures (e.g., JSON objects)
- Edge cases (e.g., very large metadata)
Also applies to: 50-50, 58-58, 66-66
notebooks/cognee_llama_index.ipynb (1)
156-159
: Document the new metadata handling approach.Since this notebook serves as an example, it would be helpful to add documentation showing how Foreign Metadata from LlamaIndex Documents appears in the graph visualization. Consider adding a markdown cell after the graph rendering to explain this.
Add a markdown cell like this:
await add(documents) # Use LLMs and cognee to create knowledge graph await cognee.cognify() +``` + +## Understanding Foreign Metadata in the Graph + +When using LlamaIndex Documents, their metadata is now displayed as part of the Document node's properties in the graph. You can observe this in the visualization above, where each Document node includes the original LlamaIndex metadata. +```
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (23)
cognee/api/v1/add/add_v2.py
(2 hunks)cognee/api/v1/cognify/code_graph_pipeline.py
(2 hunks)cognee/infrastructure/databases/vector/embeddings/LiteLLMEmbeddingEngine.py
(1 hunks)cognee/modules/chunking/TextChunker.py
(1 hunks)cognee/modules/data/models/Data.py
(2 hunks)cognee/modules/data/models/Metadata.py
(0 hunks)cognee/modules/data/operations/delete_metadata.py
(0 hunks)cognee/modules/data/operations/get_metadata.py
(0 hunks)cognee/modules/data/operations/write_metadata.py
(0 hunks)cognee/modules/data/processing/document_types/Document.py
(1 hunks)cognee/tasks/documents/classify_documents.py
(2 hunks)cognee/tasks/ingestion/__init__.py
(1 hunks)cognee/tasks/ingestion/ingest_data.py
(1 hunks)cognee/tasks/ingestion/ingest_data_with_metadata.py
(0 hunks)cognee/tasks/ingestion/save_data_item_to_storage.py
(1 hunks)cognee/tasks/ingestion/save_data_item_with_metadata_to_storage.py
(0 hunks)cognee/tasks/ingestion/save_data_to_storage.py
(0 hunks)cognee/tests/integration/documents/AudioDocument_test.py
(1 hunks)cognee/tests/integration/documents/ImageDocument_test.py
(1 hunks)cognee/tests/integration/documents/PdfDocument_test.py
(1 hunks)cognee/tests/integration/documents/TextDocument_test.py
(1 hunks)cognee/tests/integration/documents/UnstructuredDocument_test.py
(1 hunks)notebooks/cognee_llama_index.ipynb
(3 hunks)
💤 Files with no reviewable changes (7)
- cognee/modules/data/operations/get_metadata.py
- cognee/tasks/ingestion/save_data_to_storage.py
- cognee/modules/data/models/Metadata.py
- cognee/modules/data/operations/delete_metadata.py
- cognee/tasks/ingestion/save_data_item_with_metadata_to_storage.py
- cognee/tasks/ingestion/ingest_data_with_metadata.py
- cognee/modules/data/operations/write_metadata.py
⏰ Context from checks skipped due to timeout of 90000ms (5)
- GitHub Check: test
- GitHub Check: test
- GitHub Check: test
- GitHub Check: windows-latest
- GitHub Check: docker-compose-test
🔇 Additional comments (12)
cognee/tasks/ingestion/ingest_data.py (2)
142-152
: Ensure consistent dataset names for different database enginesThe dataset names differ between SQLite and other databases:
- SQLite uses
"main"
as thedataset_name
.- Other databases use
"public"
as thedataset_name
.Confirm that this discrepancy is intentional and that downstream processes handle these dataset names correctly.
29-36
: Improve warning message and ensure consistent metadata handlingIn the
get_foreign_metadata_dict
function, there's a couple of issues to address:
- Ellipsis in warning message: The warning message on line 34 includes an ellipsis (
...
), which may be unintentional and could confuse the reader.- Returning an empty dictionary: When the
data_item
does not have adict
method, the function returns an empty dictionary. This might lead to unexpected behavior downstream if not handled properly.Apply this diff to improve the warning message and consider if returning an empty dictionary is appropriate:
def get_foreign_metadata_dict(data_item: Union[BinaryIO, str, Any]) -> dict[str, Any]: if hasattr(data_item, "dict") and inspect.ismethod(getattr(data_item, "dict")): return {"metadata": data_item.dict(), "origin": str(type(data_item))} else: - warnings.warn( - f"Data of type {type(data_item)}... does not have dict method. Returning empty metadata." - ) + warnings.warn( + f"Data of type {type(data_item)} does not have a 'dict' method. Returning empty metadata." + ) return {}Ensure that functions consuming
foreign_metadata
can handle cases where it's an empty dictionary.✅ Verification successful
Warning message improvement is safe to apply, empty metadata handling is correct
The codebase is designed to handle empty metadata safely:
- The
foreign_metadata
field is optional in the Document model- Empty metadata is wrapped in a tuple for additional safety
- No evidence of code requiring non-empty metadata
🏁 Scripts executed
The following scripts were executed for the analysis:
Error: No shell scripts found
Script:
#!/bin/bash # Find calls to get_foreign_metadata_dict and its usage context ast-grep --pattern 'get_foreign_metadata_dict($_)' # Search for direct references to the function rg "get_foreign_metadata_dict" -A 3 -B 3 # Look for metadata dictionary access patterns rg "foreign_metadata\[" -A 2 -B 2 rg "metadata.*dict" -A 2 -B 2Length of output: 48549
cognee/tasks/ingestion/__init__.py (1)
2-2
: Update import statements to reflect refactoringThe import of
ingest_data
replaces the previous imports of deprecated functions. This change aligns with the refactoring objectives to simplify the ingestion process.cognee/modules/data/models/Data.py (1)
3-4
: LGTM: Import changes align with new schema requirements.The addition of JSON type import supports the new foreign_metadata column.
cognee/api/v1/add/add_v2.py (2)
25-25
: LGTM: Task pipeline update is consistent.The task list correctly reflects the simplified ingestion process.
5-5
: Verify the impact of removing metadata-specific ingestion.The change from
ingest_data_with_metadata
toingest_data
aligns with the metadata rework. However, we should verify:
- Handling of existing metadata during ingestion
- Impact on downstream processes expecting metadata
Let's check for dependencies and usages:
✅ Verification successful
Metadata handling is properly maintained in the new implementation
The consolidation of
ingest_data_with_metadata
intoingest_data
is a safe refactoring that preserves all metadata functionality. The new implementation:
- Captures and stores all file metadata (name, path, extension, mime_type, hash)
- Preserves custom metadata through foreign_metadata field
- Maintains backward compatibility with existing data structures
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Search for references to metadata ingestion rg -g '!*.pyc' 'ingest_data_with_metadata' # Check for potential impacts on downstream processes rg -g '!*.pyc' -A 5 'metadata.*process|process.*metadata'Length of output: 37394
Script:
#!/bin/bash # Check implementation of ingest_data and metadata handling ast-grep --pattern 'def ingest_data($$$)' # Look for metadata handling patterns in ingestion module rg -g '!*.pyc' -A 5 'metadata.*ingest|ingest.*metadata' cognee/tasks/ingestion/Length of output: 125
Script:
#!/bin/bash # Find ingestion implementation with broader pattern fd . cognee/tasks/ingestion -t f -x cat {} # Search for any metadata handling patterns rg -g '!*.pyc' 'metadata' cognee/tasks/Length of output: 17095
cognee/tests/integration/documents/AudioDocument_test.py (1)
32-32
: Similar improvements needed as in TextDocument_test.py.See the review comment in TextDocument_test.py about adding test cases for foreign_metadata handling.
cognee/tests/integration/documents/ImageDocument_test.py (1)
21-21
: Similar improvements needed as in TextDocument_test.py.See the review comment in TextDocument_test.py about adding test cases for foreign_metadata handling.
cognee/api/v1/cognify/code_graph_pipeline.py (1)
13-13
: LGTM! Changes align with metadata simplification objective.The transition from
ingest_data_with_metadata
toingest_data
is clean and maintains the existing task configuration structure.Also applies to: 71-71
notebooks/cognee_llama_index.ipynb (3)
122-124
: Cell metadata changes look good.These changes reset the cell execution state, which is a good practice for notebooks that will be shared or committed to version control.
136-136
: Verify backward compatibility of the new ingestion method.The change from
ingest_data_with_metadata
toingest_data
aligns with the PR's goal of simplifying metadata handling. However, we should verify that this change maintains compatibility with existing LlamaIndex documents.Let's check if there are any other notebooks or examples that might be affected:
✅ Verification successful
Import change is consistent and properly migrated
The change from
ingest_data_with_metadata
toingest_data
has been properly implemented across the codebase. The new function is consistently used in all relevant places, and there are no remaining references to the old implementation.🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Search for any remaining references to the old ingestion method rg "ingest_data_with_metadata" --type py --type jupyterLength of output: 55
Script:
#!/bin/bash # Check the implementation of ingest_data and its usages ast-grep --pattern 'def ingest_data($$$)' # Find other references to ingest_data to understand usage patterns rg "ingest_data" -A 3 --type py --type jupyterLength of output: 2465
156-156
: Verify preservation of LlamaIndex Document metadata.Since this PR changes how Foreign Metadata is handled, we should verify that the LlamaIndex Document metadata is properly preserved in the Document node's graph property.
Let's check the implementation:
✅ Verification successful
Metadata handling is properly implemented and preserved
The change from
ingest_data_with_metadata
toingest_data
is safe as the metadata from LlamaIndex Documents is automatically extracted and stored in theforeign_metadata
field through theget_foreign_metadata_dict
helper function.🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Check how LlamaIndex Document metadata is handled in the new implementation ast-grep --pattern 'def ingest_data($$$): $$$ # Look for metadata handling $$$ 'Length of output: 88
Script:
#!/bin/bash # Find the ingest_data function definition rg "def ingest_data" -A 10 # Also search for metadata handling near ingest_data rg "ingest_data.*metadata|metadata.*ingest_data" -A 5Length of output: 989
Script:
#!/bin/bash # Get complete implementation of ingest_data function cat cognee/tasks/ingestion/ingest_data.py # Search for LlamaIndex Document handling rg -l "from llama_index" rg "Document.*metadata" -A 5Length of output: 44109
data_point.mime_type = file_metadata["mime_type"] | ||
data_point.owner_id = user.id | ||
data_point.content_hash = file_metadata["content_hash"] | ||
data_point.foreign_metadata = (get_foreign_metadata_dict(data_item),) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Assigning tuple to foreign_metadata
may cause data type issues
On line 98, foreign_metadata
is assigned as a tuple containing a dictionary:
data_point.foreign_metadata = (get_foreign_metadata_dict(data_item),)
This assignment creates a tuple with one element, which may not be the intended data type for foreign_metadata
. Based on the DataPoint
model, foreign_metadata
is expected to be an optional string.
To fix this issue, remove the parentheses and comma to assign the dictionary directly:
- data_point.foreign_metadata = (get_foreign_metadata_dict(data_item),)
+ data_point.foreign_metadata = get_foreign_metadata_dict(data_item)
Or, if foreign_metadata
should be a JSON string, serialize it:
+ import json
...
- data_point.foreign_metadata = (get_foreign_metadata_dict(data_item),)
+ data_point.foreign_metadata = json.dumps(get_foreign_metadata_dict(data_item))
mime_type=file_metadata["mime_type"], | ||
owner_id=user.id, | ||
content_hash=file_metadata["content_hash"], | ||
foreign_metadata=get_foreign_metadata_dict(data_item), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ensure correct assignment of foreign_metadata
when creating Data
object
Similar to the previous issue, on line 109, foreign_metadata
is assigned:
foreign_metadata=get_foreign_metadata_dict(data_item),
The trailing comma may inadvertently create a tuple, leading to potential data type issues.
Apply this diff to correct the assignment:
- foreign_metadata=get_foreign_metadata_dict(data_item),
+ foreign_metadata=get_foreign_metadata_dict(data_item)
Or serialize if necessary:
+ import json
...
- foreign_metadata=get_foreign_metadata_dict(data_item)
+ foreign_metadata=json.dumps(get_foreign_metadata_dict(data_item))
@@ -7,7 +7,7 @@ | |||
class Document(DataPoint): | |||
name: str | |||
raw_data_location: str | |||
metadata_id: UUID | |||
foreign_metadata: Optional[str] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential type mismatch for foreign_metadata
attribute
On line 10, foreign_metadata
is defined as:
foreign_metadata: Optional[str]
However, elsewhere in the code (e.g., ingest_data.py
), foreign_metadata
is assigned a dictionary, which may cause type inconsistencies.
Consider changing the type annotation to accept a dictionary or ensure that foreign_metadata
is always a string (e.g., by serializing the dictionary to JSON):
- foreign_metadata: Optional[str]
+ from typing import Optional, Dict, Any
+ foreign_metadata: Optional[Dict[str, Any]]
Or, if foreign_metadata
should remain a string, ensure all assignments serialize the data properly:
foreign_metadata=json.dumps(get_foreign_metadata_dict(data_item))
cognee/modules/data/models/Data.py
Outdated
@@ -21,6 +19,7 @@ class Data(Base): | |||
raw_data_location = Column(String) | |||
owner_id = Column(UUID, index=True) | |||
content_hash = Column(String) | |||
foreign_metadata = Column(JSON) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codebase verification
Consider adding schema validation for foreign_metadata - Confirmed
The codebase shows that foreign_metadata
follows an implicit schema {"metadata": dict, "origin": str}
but lacks validation. Current implementation silently accepts invalid data, which could lead to inconsistencies. Recommended changes:
- Add schema validation using Pydantic or similar
- Document the expected format in the model's docstring
- Replace the warning with proper error handling for invalid metadata
🔗 Analysis chain
Consider adding schema validation for foreign_metadata.
While using JSON type provides flexibility, consider:
- Adding schema validation to ensure consistent metadata structure
- Documenting the expected format
- Adding migration strategy for existing metadata
Let's check for any existing schema validation or documentation:
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for schema validation or documentation
rg -g '!*.pyc' -A 5 'foreign_metadata.*schema|metadata.*schema'
# Check for migration files
fd -g 'migrations/*.py' -x cat {}
Length of output: 2493
Script:
#!/bin/bash
# Get the complete Data.py file
cat cognee/modules/data/models/Data.py
# Search for any validation logic or usage of foreign_metadata
rg -g '!*.pyc' -A 5 'foreign_metadata'
# Look for any documentation files
fd -e md -e rst -x grep -l -i "metadata" {} \;
Length of output: 9265
if "llama_index" in str(type(data_item)): | ||
# Dynamic import is used because the llama_index module is optional. | ||
from .transform_data import get_data_from_llama_index | ||
|
||
file_path = get_data_from_llama_index(data_item, dataset_name) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add error handling for llama_index integration.
The dynamic import and processing of llama_index data should include proper error handling:
- Import failures
- Data processing errors
- Invalid llama_index data
if "llama_index" in str(type(data_item)):
- # Dynamic import is used because the llama_index module is optional.
- from .transform_data import get_data_from_llama_index
+ try:
+ # Dynamic import is used because the llama_index module is optional.
+ from .transform_data import get_data_from_llama_index
+ except ImportError as e:
+ raise IngestionError(f"LlamaIndex support not available: {str(e)}")
+
+ try:
+ file_path = get_data_from_llama_index(data_item, dataset_name)
+ except Exception as e:
+ raise IngestionError(f"Failed to process LlamaIndex data: {str(e)}")
- file_path = get_data_from_llama_index(data_item, dataset_name)
Committable suggestion skipped: line range outside the PR's diff.
@@ -20,7 +20,7 @@ def test_PdfDocument(): | |||
id=uuid.uuid4(), | |||
name="Test document.pdf", | |||
raw_data_location=test_file_path, | |||
metadata_id=uuid.uuid4(), | |||
foreign_metadata="", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Enhance test coverage for foreign_metadata scenarios.
Current test uses an empty string for foreign_metadata, but should include tests for:
- Valid JSON metadata
- Different metadata structures
- Edge cases (null, invalid JSON)
Example test cases to add:
def test_PdfDocument_with_metadata():
metadata_cases = [
{"source": "llama_index", "metadata": {"key": "value"}},
{"source": "external", "tags": ["test", "pdf"]},
None,
{"invalid": float('inf')}, # Should handle JSON serialization edge cases
]
for metadata in metadata_cases:
document = PdfDocument(
id=uuid.uuid4(),
name="Test document.pdf",
raw_data_location=test_file_path,
foreign_metadata=metadata,
mime_type="",
)
# Add assertions for metadata handling
@@ -32,7 +32,7 @@ def test_TextDocument(input_file, chunk_size): | |||
id=uuid.uuid4(), | |||
name=input_file, | |||
raw_data_location=test_file_path, | |||
metadata_id=uuid.uuid4(), | |||
foreign_metadata="", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add test cases for foreign_metadata handling.
The test currently initializes foreign_metadata
with an empty string but doesn't verify how the document handles various foreign metadata scenarios. Consider adding test cases for:
- Valid JSON metadata
- Empty metadata
- Invalid JSON metadata
Example test case:
@pytest.mark.parametrize(
"foreign_metadata,expected_result",
[
('{"source": "external", "tags": ["test"]}', True),
("", True),
("invalid json", False),
],
)
def test_TextDocument_foreign_metadata(foreign_metadata, expected_result):
document = TextDocument(
id=uuid.uuid4(),
name="test.txt",
raw_data_location="test_path",
foreign_metadata=foreign_metadata,
mime_type=""
)
assert hasattr(document, "foreign_metadata") == expected_result
@@ -1,4 +1,5 @@ | |||
from cognee.modules.data.models import Data | |||
import json |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add error handling for JSON serialization and remove unnecessary indentation.
The current implementation has two potential issues:
- No error handling for JSON serialization failures
- Unnecessary overhead from pretty-printing JSON with indent=4
Consider this implementation:
async def classify_documents(data_documents: list[Data]) -> list[Document]:
documents = []
for data_item in data_documents:
+ try:
+ metadata_json = json.dumps(data_item.foreign_metadata)
+ except (TypeError, ValueError) as e:
+ logger.error(f"Failed to serialize foreign_metadata for {data_item.id}: {e}")
+ metadata_json = "{}"
+
document = EXTENSION_TO_DOCUMENT_CLASS[data_item.extension](
id=data_item.id,
title=f"{data_item.name}.{data_item.extension}",
raw_data_location=data_item.raw_data_location,
name=data_item.name,
mime_type=data_item.mime_type,
- foreign_metadata=json.dumps(data_item.foreign_metadata, indent=4),
+ foreign_metadata=metadata_json,
)
Also applies to: 61-61
_metadata={"index_fields": ["text"]}, | ||
) | ||
except Exception as e: | ||
print(e) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Improve error handling in DocumentChunk creation.
Replace print statements with proper logging:
- except Exception as e:
- print(e)
+ except Exception as e:
+ logger.error("Failed to create DocumentChunk: %s", str(e))
Also consider:
- Adding more specific exception types
- Including context information in the error message
- Potentially propagating the error up the stack
Committable suggestion skipped: line range outside the PR's diff.
except litellm.exceptions.BadRequestError: | ||
raise EmbeddingException("Failed to index data points.") | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Enhance error handling granularity.
The current implementation might miss important OpenAI-specific errors. Consider:
- Restoring specific handling for OpenAIError
- Adding specific error messages for different failure scenarios
- except litellm.exceptions.BadRequestError:
- raise EmbeddingException("Failed to index data points.")
+ except litellm.exceptions.BadRequestError as e:
+ raise EmbeddingException(f"Bad request while indexing data points: {str(e)}")
+ except litellm.llms.OpenAI.openai.OpenAIError as e:
+ raise EmbeddingException(f"OpenAI service error: {str(e)}")
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
except litellm.exceptions.BadRequestError: | |
raise EmbeddingException("Failed to index data points.") | |
except litellm.exceptions.BadRequestError as e: | |
raise EmbeddingException(f"Bad request while indexing data points: {str(e)}") | |
except litellm.llms.OpenAI.openai.OpenAIError as e: | |
raise EmbeddingException(f"OpenAI service error: {str(e)}") |
Rename foreign metadata to external metadata for metadata coming outside of Cognee
…ee into COG-793-metadata-rework
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
♻️ Duplicate comments (1)
cognee/tasks/ingestion/ingest_data.py (1)
98-98
:⚠️ Potential issueFix tuple assignment to external_metadata.
The trailing comma creates an unintended tuple.
-data_point.external_metadata = (get_external_metadata_dict(data_item),) +data_point.external_metadata = get_external_metadata_dict(data_item)
🧹 Nitpick comments (3)
cognee/tasks/ingestion/ingest_data.py (3)
1-18
: Consider reorganizing imports for better readability.Consider organizing imports into standard library, third-party, and local application imports with a blank line between each group:
from typing import Any, List, Union, BinaryIO import inspect import warnings import dlt import cognee.modules.ingestion as ingestion from cognee.infrastructure.databases.relational import get_relational_engine from cognee.modules.data.methods import create_dataset from cognee.modules.data.models.DatasetData import DatasetData from cognee.modules.users.models import User from cognee.modules.users.permissions.methods import give_permission_on_document from cognee.shared.utils import send_telemetry from .get_dlt_destination import get_dlt_destination from .save_data_item_to_storage import save_data_item_to_storage
29-36
: Enhance the helper function with documentation and improved error handling.The function could benefit from:
- A docstring explaining its purpose and return value
- More specific type hints
- A more informative warning message
-def get_external_metadata_dict(data_item: Union[BinaryIO, str, Any]) -> dict[str, Any]: +def get_external_metadata_dict(data_item: Any) -> dict[str, Any]: + """Extract metadata from a data item. + + Args: + data_item: Any object that might have a dict method for metadata extraction + + Returns: + dict: A dictionary containing metadata and origin information, or empty dict if extraction fails + """ if hasattr(data_item, "dict") and inspect.ismethod(getattr(data_item, "dict")): return {"metadata": data_item.dict(), "origin": str(type(data_item))} else: warnings.warn( - f"Data of type {type(data_item)}... does not have dict method. Returning empty metadata." + f"Unable to extract metadata from {type(data_item).__name__}: object does not implement dict() method" ) return {}
137-154
: Add error handling for pipeline execution and extract configuration.The pipeline configuration should be handled more robustly:
+def _get_pipeline_config(dialect_name: str) -> tuple[str, str]: + """Get pipeline configuration based on database dialect. + + Returns: + tuple: (dataset_name, table_name) + """ + if dialect_name == "sqlite": + return "main", "file_metadata" + return "public", "file_metadata" if db_engine.engine.dialect.name == "sqlite": - # To use sqlite with dlt dataset_name must be set to "main". - # Sqlite doesn't support schemas - run_info = pipeline.run( - data_resources(file_paths, user), - table_name="file_metadata", - dataset_name="main", - write_disposition="merge", - ) -else: - # Data should be stored in the same schema to allow deduplication - run_info = pipeline.run( - data_resources(file_paths, user), - table_name="file_metadata", - dataset_name="public", - write_disposition="merge", - ) + dataset_name, table_name = _get_pipeline_config(db_engine.engine.dialect.name) + try: + run_info = pipeline.run( + data_resources(file_paths, user), + table_name=table_name, + dataset_name=dataset_name, + write_disposition="merge", + ) + except Exception as e: + raise RuntimeError(f"Pipeline execution failed: {str(e)}") from e
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (9)
cognee/modules/data/models/Data.py
(2 hunks)cognee/modules/data/processing/document_types/Document.py
(1 hunks)cognee/tasks/documents/classify_documents.py
(2 hunks)cognee/tasks/ingestion/ingest_data.py
(1 hunks)cognee/tests/integration/documents/AudioDocument_test.py
(1 hunks)cognee/tests/integration/documents/ImageDocument_test.py
(1 hunks)cognee/tests/integration/documents/PdfDocument_test.py
(1 hunks)cognee/tests/integration/documents/TextDocument_test.py
(1 hunks)cognee/tests/integration/documents/UnstructuredDocument_test.py
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (8)
- cognee/modules/data/processing/document_types/Document.py
- cognee/tests/integration/documents/ImageDocument_test.py
- cognee/tests/integration/documents/AudioDocument_test.py
- cognee/modules/data/models/Data.py
- cognee/tests/integration/documents/TextDocument_test.py
- cognee/tasks/documents/classify_documents.py
- cognee/tests/integration/documents/UnstructuredDocument_test.py
- cognee/tests/integration/documents/PdfDocument_test.py
⏰ Context from checks skipped due to timeout of 90000ms (19)
- GitHub Check: run_multimedia_example_test / test
- GitHub Check: run_notebook_test / test
- GitHub Check: run_notebook_test / test
- GitHub Check: run_notebook_test / test
- GitHub Check: run_notebook_test / test
- GitHub Check: test
- GitHub Check: test
- GitHub Check: run_simple_example_test / test
- GitHub Check: run_dynamic_steps_example_test / test
- GitHub Check: test
- GitHub Check: test
- GitHub Check: test
- GitHub Check: test
- GitHub Check: test
- GitHub Check: windows-latest
- GitHub Check: test
- GitHub Check: test
- GitHub Check: profiler
- GitHub Check: docker-compose-test
data = [data] | ||
|
||
async with db_engine.get_async_session() as session: | ||
# Read metadata stored with dlt | ||
files_metadata = await db_engine.get_all_data_from_table(table_name, dataset_name) | ||
for file_metadata in files_metadata: | ||
from sqlalchemy import select | ||
from cognee.modules.data.models import Data | ||
file_paths = [] | ||
|
||
dataset = await create_dataset(dataset_name, user.id, session) | ||
# Process data | ||
for data_item in data: | ||
file_path = await save_data_item_to_storage(data_item, dataset_name) | ||
|
||
data = ( | ||
await session.execute(select(Data).filter(Data.id == UUID(file_metadata["id"]))) | ||
).scalar_one_or_none() | ||
file_paths.append(file_path) | ||
|
||
if data is not None: | ||
data.name = file_metadata["name"] | ||
data.raw_data_location = file_metadata["file_path"] | ||
data.extension = file_metadata["extension"] | ||
data.mime_type = file_metadata["mime_type"] | ||
# Ingest data and add metadata | ||
with open(file_path.replace("file://", ""), mode="rb") as file: | ||
classified_data = ingestion.classify(file) | ||
|
||
# data_id is the hash of file contents + owner id to avoid duplicate data | ||
data_id = ingestion.identify(classified_data, user) | ||
|
||
file_metadata = classified_data.get_metadata() | ||
|
||
from sqlalchemy import select | ||
|
||
from cognee.modules.data.models import Data | ||
|
||
db_engine = get_relational_engine() | ||
|
||
async with db_engine.get_async_session() as session: | ||
dataset = await create_dataset(dataset_name, user.id, session) | ||
|
||
# Check to see if data should be updated | ||
data_point = ( | ||
await session.execute(select(Data).filter(Data.id == data_id)) | ||
).scalar_one_or_none() | ||
|
||
if data_point is not None: | ||
data_point.name = file_metadata["name"] | ||
data_point.raw_data_location = file_metadata["file_path"] | ||
data_point.extension = file_metadata["extension"] | ||
data_point.mime_type = file_metadata["mime_type"] | ||
data_point.owner_id = user.id | ||
data_point.content_hash = file_metadata["content_hash"] | ||
data_point.external_metadata = (get_external_metadata_dict(data_item),) | ||
await session.merge(data_point) | ||
else: | ||
data_point = Data( | ||
id=data_id, | ||
name=file_metadata["name"], | ||
raw_data_location=file_metadata["file_path"], | ||
extension=file_metadata["extension"], | ||
mime_type=file_metadata["mime_type"], | ||
owner_id=user.id, | ||
content_hash=file_metadata["content_hash"], | ||
external_metadata=get_external_metadata_dict(data_item), | ||
) | ||
|
||
# Check if data is already in dataset | ||
dataset_data = ( | ||
await session.execute( | ||
select(DatasetData).filter( | ||
DatasetData.data_id == data_id, DatasetData.dataset_id == dataset.id | ||
) | ||
) | ||
).scalar_one_or_none() | ||
# If data is not present in dataset add it | ||
if dataset_data is None: | ||
dataset.data.append(data_point) | ||
|
||
await session.merge(data) | ||
await session.commit() | ||
else: | ||
data = Data( | ||
id=UUID(file_metadata["id"]), | ||
name=file_metadata["name"], | ||
raw_data_location=file_metadata["file_path"], | ||
extension=file_metadata["extension"], | ||
mime_type=file_metadata["mime_type"], | ||
) | ||
|
||
dataset.data.append(data) | ||
await session.commit() | ||
|
||
await give_permission_on_document(user, UUID(file_metadata["id"]), "read") | ||
await give_permission_on_document(user, UUID(file_metadata["id"]), "write") | ||
await give_permission_on_document(user, data_id, "read") | ||
await give_permission_on_document(user, data_id, "write") | ||
return file_paths |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Refactor long function into smaller, focused functions.
The data_storing
function is too complex and handles multiple responsibilities. Consider breaking it down into smaller functions:
- Extract database operations:
async def _update_data_point(data_point: Data, file_metadata: dict, user: User, data_item: Any) -> None:
data_point.name = file_metadata["name"]
data_point.raw_data_location = file_metadata["file_path"]
data_point.extension = file_metadata["extension"]
data_point.mime_type = file_metadata["mime_type"]
data_point.owner_id = user.id
data_point.content_hash = file_metadata["content_hash"]
data_point.external_metadata = get_external_metadata_dict(data_item)
async def _create_data_point(data_id: str, file_metadata: dict, user: User, data_item: Any) -> Data:
return Data(
id=data_id,
name=file_metadata["name"],
raw_data_location=file_metadata["file_path"],
extension=file_metadata["extension"],
mime_type=file_metadata["mime_type"],
owner_id=user.id,
content_hash=file_metadata["content_hash"],
external_metadata=get_external_metadata_dict(data_item)
)
- Add error handling:
async def data_storing(data: Any, dataset_name: str, user: User):
if not isinstance(data, list):
data = [data]
file_paths = []
for data_item in data:
- file_path = await save_data_item_to_storage(data_item, dataset_name)
- file_paths.append(file_path)
+ try:
+ file_path = await save_data_item_to_storage(data_item, dataset_name)
+ file_paths.append(file_path)
+ except Exception as e:
+ raise RuntimeError(f"Failed to save data item to storage: {str(e)}") from e
async with db_engine.get_async_session() as session:
+ try:
dataset = await create_dataset(dataset_name, user.id, session)
# ... rest of the database operations ...
await session.commit()
+ except Exception as e:
+ await session.rollback()
+ raise RuntimeError(f"Database operation failed: {str(e)}") from e
@dlt.resource(standalone=True, primary_key="id", merge_key="id") | ||
async def data_resources(file_paths: List[str], user: User): | ||
for file_path in file_paths: | ||
with open(file_path.replace("file://", ""), mode="rb") as file: | ||
classified_data = ingestion.classify(file) | ||
data_id = ingestion.identify(classified_data) | ||
data_id = ingestion.identify(classified_data, user) | ||
file_metadata = classified_data.get_metadata() | ||
yield { | ||
"id": data_id, | ||
"name": file_metadata["name"], | ||
"file_path": file_metadata["file_path"], | ||
"extension": file_metadata["extension"], | ||
"mime_type": file_metadata["mime_type"], | ||
"content_hash": file_metadata["content_hash"], | ||
"owner_id": str(user.id), | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add error handling for file operations.
The function should handle potential file operation errors and validate inputs:
@dlt.resource(standalone=True, primary_key="id", merge_key="id")
async def data_resources(file_paths: List[str], user: User):
+ if not file_paths:
+ raise ValueError("file_paths cannot be empty")
+
for file_path in file_paths:
- with open(file_path.replace("file://", ""), mode="rb") as file:
- classified_data = ingestion.classify(file)
- data_id = ingestion.identify(classified_data, user)
- file_metadata = classified_data.get_metadata()
- yield {
- "id": data_id,
- "name": file_metadata["name"],
- "file_path": file_metadata["file_path"],
- "extension": file_metadata["extension"],
- "mime_type": file_metadata["mime_type"],
- "content_hash": file_metadata["content_hash"],
- "owner_id": str(user.id),
- }
+ try:
+ with open(file_path.replace("file://", ""), mode="rb") as file:
+ classified_data = ingestion.classify(file)
+ data_id = ingestion.identify(classified_data, user)
+ file_metadata = classified_data.get_metadata()
+ yield {
+ "id": data_id,
+ "name": file_metadata["name"],
+ "file_path": file_metadata["file_path"],
+ "extension": file_metadata["extension"],
+ "mime_type": file_metadata["mime_type"],
+ "content_hash": file_metadata["content_hash"],
+ "owner_id": str(user.id),
+ }
+ except FileNotFoundError as e:
+ raise ValueError(f"File not found: {file_path}") from e
+ except Exception as e:
+ raise RuntimeError(f"Error processing file {file_path}: {str(e)}") from e
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
@dlt.resource(standalone=True, primary_key="id", merge_key="id") | |
async def data_resources(file_paths: List[str], user: User): | |
for file_path in file_paths: | |
with open(file_path.replace("file://", ""), mode="rb") as file: | |
classified_data = ingestion.classify(file) | |
data_id = ingestion.identify(classified_data) | |
data_id = ingestion.identify(classified_data, user) | |
file_metadata = classified_data.get_metadata() | |
yield { | |
"id": data_id, | |
"name": file_metadata["name"], | |
"file_path": file_metadata["file_path"], | |
"extension": file_metadata["extension"], | |
"mime_type": file_metadata["mime_type"], | |
"content_hash": file_metadata["content_hash"], | |
"owner_id": str(user.id), | |
} | |
@dlt.resource(standalone=True, primary_key="id", merge_key="id") | |
async def data_resources(file_paths: List[str], user: User): | |
if not file_paths: | |
raise ValueError("file_paths cannot be empty") | |
for file_path in file_paths: | |
try: | |
with open(file_path.replace("file://", ""), mode="rb") as file: | |
classified_data = ingestion.classify(file) | |
data_id = ingestion.identify(classified_data, user) | |
file_metadata = classified_data.get_metadata() | |
yield { | |
"id": data_id, | |
"name": file_metadata["name"], | |
"file_path": file_metadata["file_path"], | |
"extension": file_metadata["extension"], | |
"mime_type": file_metadata["mime_type"], | |
"content_hash": file_metadata["content_hash"], | |
"owner_id": str(user.id), | |
} | |
except FileNotFoundError as e: | |
raise ValueError(f"File not found: {file_path}") from e | |
except Exception as e: | |
raise RuntimeError(f"Error processing file {file_path}: {str(e)}") from e |
Description
Simplify Foreign Metadata handling in Cognee
Show Foreign Metadata in graphs as part of the Document node
Remove unused methods and files related to metadata ingestion
Picture showing foreign metadata in graph property of document node:
Foreign metadata refers to metadata related do documents that were not processed by Cognee, for example if using LlamaIndex to handle documents
DCO Affirmation
I affirm that all code in every commit of this pull request conforms to the terms of the Topoteretes Developer Certificate of Origin
Summary by CodeRabbit
Release Notes
New Features
Breaking Changes
metadata_id
withexternal_metadata
across document types.Improvements
Refactoring