-
Notifications
You must be signed in to change notification settings - Fork 86
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Integrate graphiti's temporal awareness functionality as Tasks #253
Conversation
Caution Review failedThe pull request is closed. WalkthroughThis pull request introduces several changes to the Changes
Poem
📜 Recent review detailsConfiguration used: CodeRabbit UI 📒 Files selected for processing (1)
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 6
🧹 Outside diff range and nitpick comments (6)
examples/python/graphiti_example.py (4)
4-4
: Remove unused importThe
SearchType
import is not used in the code.-from cognee.api.v1.search import SearchType
9-13
: Consider moving test data to a separate fileThe example text data should be moved to a separate data file for better maintainability and reusability.
Create a new file
examples/python/data/sample_texts.py
:KAMALA_HARRIS_TIMELINE = [ "Kamala Harris is the Attorney General of California. She was previously " "the district attorney for San Francisco.", "As AG, Harris was in office from January 3, 2011 – January 3, 2017", ]Then import and use it in the example:
-text_list = [ - "Kamala Harris is the Attorney General of California. She was previously " - "the district attorney for San Francisco.", - "As AG, Harris was in office from January 3, 2011 – January 3, 2017", -] +from data.sample_texts import KAMALA_HARRIS_TIMELINE + +text_list = KAMALA_HARRIS_TIMELINE
24-25
: Improve result handling and presentationThe results are printed without context, making it difficult to understand the output.
- async for result in pipeline: - print(result) + try: + async for result in pipeline: + print(f"Search Result: {result}") + except Exception as e: + print(f"Pipeline execution failed: {e}")
28-29
: Add proper script execution handlingThe main execution block should handle keyboard interrupts and provide proper exit codes.
if __name__ == '__main__': - asyncio.run(main()) + try: + asyncio.run(main()) + except KeyboardInterrupt: + print("\nOperation cancelled by user") + exit(1) + except Exception as e: + print(f"Execution failed: {e}") + exit(1)cognee/tasks/temporal_awareness/build_graph_with_temporal_awareness.py (2)
13-21
: Add batch processing capability for better performance.Consider implementing batch processing for multiple episodes to improve performance with large text lists.
+ BATCH_SIZE = 100 + for i in range(0, len(text_list), BATCH_SIZE): + batch = text_list[i:i + BATCH_SIZE] + tasks = [ + graphiti.add_episode( + name=f"episode_{i+j}", + episode_body=text, + source=EpisodeType.text, + source_description="input", + reference_time=datetime.now() + ) + for j, text in enumerate(batch) + ] + await asyncio.gather(*tasks) + print(f"Added batch of {len(batch)} episodes...")
21-21
: Improve logging mechanism.Replace print statements with proper logging to enable better monitoring and debugging.
+import logging + +logger = logging.getLogger(__name__) + async def build_graph_with_temporal_awareness(text_list: list[str]) -> Graphiti: # ... initialization code ... - print("Graph database initialized.") + logger.info("Graph database initialized") for i, text in enumerate(text_list): # ... episode addition code ... - print(f"Added text: {text[:35]}...") + logger.debug(f"Added episode_{i}: {text[:35]}...")
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (4)
cognee/tasks/temporal_awareness/__init__.py
(1 hunks)cognee/tasks/temporal_awareness/build_graph_with_temporal_awareness.py
(1 hunks)cognee/tasks/temporal_awareness/search_graph_with_temporal_awareness.py
(1 hunks)examples/python/graphiti_example.py
(1 hunks)
✅ Files skipped from review due to trivial changes (1)
- cognee/tasks/temporal_awareness/init.py
🔇 Additional comments (1)
cognee/tasks/temporal_awareness/build_graph_with_temporal_awareness.py (1)
1-5
: LGTM! Imports are well-organized and necessary.
The imports are correctly separated between standard library and third-party packages.
async def search_graph_with_temporal_awareness(graphiti, query): | ||
search_result = await graphiti.search(query) | ||
await graphiti.close() | ||
return search_result |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add error handling, type hints, and documentation
The function needs several improvements for robustness and maintainability:
Consider this enhanced implementation:
-async def search_graph_with_temporal_awareness(graphiti, query):
- search_result = await graphiti.search(query)
- await graphiti.close()
- return search_result
+from typing import Any, TypeVar
+
+T = TypeVar('T')
+
+async def search_graph_with_temporal_awareness(graphiti: Any, query: str) -> T:
+ """Search the temporal awareness graph with the given query.
+
+ Args:
+ graphiti: The graphiti instance to use for searching
+ query: The search query string
+
+ Returns:
+ The search result from graphiti
+
+ Raises:
+ Exception: If the search operation fails
+ """
+ try:
+ search_result = await graphiti.search(query)
+ return search_result
+ except Exception as e:
+ raise Exception(f"Failed to search graph: {str(e)}") from e
+ finally:
+ try:
+ await graphiti.close()
+ except Exception:
+ # Log error but don't fail the operation
+ pass
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
async def search_graph_with_temporal_awareness(graphiti, query): | |
search_result = await graphiti.search(query) | |
await graphiti.close() | |
return search_result | |
from typing import Any, TypeVar | |
T = TypeVar('T') | |
async def search_graph_with_temporal_awareness(graphiti: Any, query: str) -> T: | |
"""Search the temporal awareness graph with the given query. | |
Args: | |
graphiti: The graphiti instance to use for searching | |
query: The search query string | |
Returns: | |
The search result from graphiti | |
Raises: | |
Exception: If the search operation fails | |
""" | |
try: | |
search_result = await graphiti.search(query) | |
return search_result | |
except Exception as e: | |
raise Exception(f"Failed to search graph: {str(e)}") from e | |
finally: | |
try: | |
await graphiti.close() | |
except Exception: | |
# Log error but don't fail the operation | |
pass |
tasks = [ | ||
Task(build_graph_with_temporal_awareness, text_list=text_list), | ||
Task(search_graph_with_temporal_awareness, query='Who was the California Attorney General?') | ||
] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Improve task dependency management and error handling
The second task depends on the result of the first task, but this dependency isn't properly managed.
Consider this improved implementation:
- tasks = [
- Task(build_graph_with_temporal_awareness, text_list=text_list),
- Task(search_graph_with_temporal_awareness, query='Who was the California Attorney General?')
- ]
+ try:
+ graphiti = await build_graph_with_temporal_awareness(text_list=text_list)
+ tasks = [
+ Task(search_graph_with_temporal_awareness,
+ graphiti=graphiti,
+ query='Who was the California Attorney General?')
+ ]
+ except Exception as e:
+ print(f"Failed to build graph: {e}")
+ return
Committable suggestion skipped: line range outside the PR's diff.
graphiti = Graphiti("bolt://localhost:7687", "neo4j", "pleaseletmein") | ||
await graphiti.build_indices_and_constraints() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Critical: Remove hardcoded credentials and add error handling.
Hardcoded credentials pose a security risk and should be moved to environment variables. Additionally, database operations should include proper error handling.
+import os
+from graphiti_core.exceptions import GraphitiConnectionError
async def build_graph_with_temporal_awareness(text_list: list[str]) -> Graphiti:
- graphiti = Graphiti("bolt://localhost:7687", "neo4j", "pleaseletmein")
- await graphiti.build_indices_and_constraints()
+ try:
+ graphiti = Graphiti(
+ os.getenv("NEO4J_URI", "bolt://localhost:7687"),
+ os.getenv("NEO4J_USER", "neo4j"),
+ os.getenv("NEO4J_PASSWORD")
+ )
+ await graphiti.build_indices_and_constraints()
+ except Exception as e:
+ raise GraphitiConnectionError(f"Failed to initialize graph database: {str(e)}")
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
graphiti = Graphiti("bolt://localhost:7687", "neo4j", "pleaseletmein") | |
await graphiti.build_indices_and_constraints() | |
try: | |
graphiti = Graphiti( | |
os.getenv("NEO4J_URI", "bolt://localhost:7687"), | |
os.getenv("NEO4J_USER", "neo4j"), | |
os.getenv("NEO4J_PASSWORD") | |
) | |
await graphiti.build_indices_and_constraints() | |
except Exception as e: | |
raise GraphitiConnectionError(f"Failed to initialize graph database: {str(e)}") |
from graphiti_core.nodes import EpisodeType | ||
|
||
|
||
async def build_graph_with_temporal_awareness(text_list): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add type hints and documentation.
The function lacks type hints and documentation, which are essential for maintainability and usability.
-async def build_graph_with_temporal_awareness(text_list):
+async def build_graph_with_temporal_awareness(text_list: list[str]) -> Graphiti:
+ """Build a temporal awareness graph from a list of texts.
+
+ Args:
+ text_list: List of text strings to be added as episodes.
+
+ Returns:
+ Graphiti: Initialized graphiti instance with added episodes.
+
+ Raises:
+ ConnectionError: If unable to connect to Neo4j database.
+ """
Committable suggestion skipped: line range outside the PR's diff.
for i, text in enumerate(text_list): | ||
await graphiti.add_episode( | ||
name=f"episode_{i}", | ||
episode_body=text, | ||
source=EpisodeType.text, | ||
source_description="input", | ||
reference_time=datetime.now() | ||
) | ||
print(f"Added text: {text[:35]}...") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Improve temporal awareness implementation.
The current implementation uses the same timestamp (current time) for all episodes, which might not be accurate for temporal awareness. Consider accepting a reference time parameter or extracting temporal information from the text.
-async def build_graph_with_temporal_awareness(text_list: list[str]) -> Graphiti:
+async def build_graph_with_temporal_awareness(
+ text_list: list[str],
+ reference_times: list[datetime] | None = None
+) -> Graphiti:
# ... initialization code ...
for i, text in enumerate(text_list):
+ reference_time = (
+ reference_times[i] if reference_times
+ else datetime.now()
+ )
await graphiti.add_episode(
name=f"episode_{i}",
episode_body=text,
source=EpisodeType.text,
source_description="input",
- reference_time=datetime.now()
+ reference_time=reference_time
)
Committable suggestion skipped: line range outside the PR's diff.
from graphiti_core import Graphiti | ||
from graphiti_core.nodes import EpisodeType |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codebase verification
The code correctly uses the Graphiti API with proper initialization and method calls (Graphiti()
constructor, build_indices_and_constraints()
, add_episode()
), and the EpisodeType
enum is used appropriately. However, the graphiti-core
dependency is not specified in any requirements file, which could lead to installation issues.
- Add
graphiti-core
to your project's requirements file to ensure proper dependency management - Consider externalizing the database connection parameters instead of hardcoding them
🔗 Analysis chain
Verify Graphiti API usage.
Let's verify that we're using the correct Graphiti API methods and that all required dependencies are properly specified in the project's requirements.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Verify Graphiti dependency and API usage
# Check if graphiti-core is in requirements
fd -t f "requirements.*\.txt" -x grep -l "graphiti-core"
# Look for other Graphiti method usages to ensure consistency
rg "from graphiti_core import|from graphiti_core\." -A 3
Length of output: 609
Script:
#!/bin/bash
# Let's check the actual usage of Graphiti and EpisodeType in the file
ast-grep --pattern 'class $_ {
$$$
Graphiti.$_($$$)
$$$
}'
# Also check direct Graphiti instantiation
ast-grep --pattern 'Graphiti($$$)'
# And check EpisodeType usage
ast-grep --pattern 'EpisodeType.$_'
# Let's also see the full content of the build_graph function to understand the context
rg -A 20 "async def build_graph_with_temporal_awareness" cognee/tasks/temporal_awareness/build_graph_with_temporal_awareness.py
Length of output: 1097
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use variables from our system to pass neo4j credentials instead of hardcoding them
Summary by CodeRabbit
New Features
cognee
library.Bug Fixes
Documentation
Refactor
Style
Tests
Chores
Revert