-
Notifications
You must be signed in to change notification settings - Fork 89
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Integrate graphiti's temporal awareness functionality as Tasks #253
Changes from all commits
dd94781
0d2a9e9
5d71059
cedb9d6
b571fb5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
from .build_graph_with_temporal_awareness import \ | ||
build_graph_with_temporal_awareness | ||
from .search_graph_with_temporal_awareness import \ | ||
search_graph_with_temporal_awareness |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
import os | ||
from datetime import datetime | ||
|
||
from graphiti_core import Graphiti | ||
from graphiti_core.nodes import EpisodeType | ||
|
||
|
||
async def build_graph_with_temporal_awareness(text_list): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Add type hints and documentation. The function lacks type hints and documentation, which are essential for maintainability and usability. -async def build_graph_with_temporal_awareness(text_list):
+async def build_graph_with_temporal_awareness(text_list: list[str]) -> Graphiti:
+ """Build a temporal awareness graph from a list of texts.
+
+ Args:
+ text_list: List of text strings to be added as episodes.
+
+ Returns:
+ Graphiti: Initialized graphiti instance with added episodes.
+
+ Raises:
+ ConnectionError: If unable to connect to Neo4j database.
+ """
|
||
|
||
url = os.getenv("GRAPH_DATABASE_URL") | ||
password = os.getenv("GRAPH_DATABASE_PASSWORD") | ||
graphiti = Graphiti(url, "neo4j", password) | ||
|
||
await graphiti.build_indices_and_constraints() | ||
print("Graph database initialized.") | ||
|
||
for i, text in enumerate(text_list): | ||
await graphiti.add_episode( | ||
name=f"episode_{i}", | ||
episode_body=text, | ||
source=EpisodeType.text, | ||
source_description="input", | ||
reference_time=datetime.now() | ||
) | ||
print(f"Added text: {text[:35]}...") | ||
Comment on lines
+17
to
+25
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Improve temporal awareness implementation. The current implementation uses the same timestamp (current time) for all episodes, which might not be accurate for temporal awareness. Consider accepting a reference time parameter or extracting temporal information from the text. -async def build_graph_with_temporal_awareness(text_list: list[str]) -> Graphiti:
+async def build_graph_with_temporal_awareness(
+ text_list: list[str],
+ reference_times: list[datetime] | None = None
+) -> Graphiti:
# ... initialization code ...
for i, text in enumerate(text_list):
+ reference_time = (
+ reference_times[i] if reference_times
+ else datetime.now()
+ )
await graphiti.add_episode(
name=f"episode_{i}",
episode_body=text,
source=EpisodeType.text,
source_description="input",
- reference_time=datetime.now()
+ reference_time=reference_time
)
|
||
return graphiti |
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,6 @@ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
async def search_graph_with_temporal_awareness(graphiti, query): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
search_result = await graphiti.search(query) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
await graphiti.close() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return search_result | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+3
to
+6
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Add error handling, type hints, and documentation The function needs several improvements for robustness and maintainability: Consider this enhanced implementation: -async def search_graph_with_temporal_awareness(graphiti, query):
- search_result = await graphiti.search(query)
- await graphiti.close()
- return search_result
+from typing import Any, TypeVar
+
+T = TypeVar('T')
+
+async def search_graph_with_temporal_awareness(graphiti: Any, query: str) -> T:
+ """Search the temporal awareness graph with the given query.
+
+ Args:
+ graphiti: The graphiti instance to use for searching
+ query: The search query string
+
+ Returns:
+ The search result from graphiti
+
+ Raises:
+ Exception: If the search operation fails
+ """
+ try:
+ search_result = await graphiti.search(query)
+ return search_result
+ except Exception as e:
+ raise Exception(f"Failed to search graph: {str(e)}") from e
+ finally:
+ try:
+ await graphiti.close()
+ except Exception:
+ # Log error but don't fail the operation
+ pass 📝 Committable suggestion
Suggested change
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
import asyncio | ||
|
||
import cognee | ||
from cognee.api.v1.search import SearchType | ||
from cognee.modules.pipelines import Task, run_tasks | ||
from cognee.tasks.temporal_awareness import ( | ||
build_graph_with_temporal_awareness, search_graph_with_temporal_awareness) | ||
|
||
text_list = [ | ||
"Kamala Harris is the Attorney General of California. She was previously " | ||
"the district attorney for San Francisco.", | ||
"As AG, Harris was in office from January 3, 2011 – January 3, 2017", | ||
] | ||
|
||
async def main(): | ||
|
||
tasks = [ | ||
Task(build_graph_with_temporal_awareness, text_list=text_list), | ||
Task(search_graph_with_temporal_awareness, query='Who was the California Attorney General?') | ||
] | ||
Comment on lines
+17
to
+20
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Improve task dependency management and error handling The second task depends on the result of the first task, but this dependency isn't properly managed. Consider this improved implementation: - tasks = [
- Task(build_graph_with_temporal_awareness, text_list=text_list),
- Task(search_graph_with_temporal_awareness, query='Who was the California Attorney General?')
- ]
+ try:
+ graphiti = await build_graph_with_temporal_awareness(text_list=text_list)
+ tasks = [
+ Task(search_graph_with_temporal_awareness,
+ graphiti=graphiti,
+ query='Who was the California Attorney General?')
+ ]
+ except Exception as e:
+ print(f"Failed to build graph: {e}")
+ return
|
||
|
||
pipeline = run_tasks(tasks) | ||
|
||
async for result in pipeline: | ||
print(result) | ||
|
||
|
||
if __name__ == '__main__': | ||
asyncio.run(main()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codebase verification
The code correctly uses the Graphiti API with proper initialization and method calls (
Graphiti()
constructor,build_indices_and_constraints()
,add_episode()
), and theEpisodeType
enum is used appropriately. However, thegraphiti-core
dependency is not specified in any requirements file, which could lead to installation issues.graphiti-core
to your project's requirements file to ensure proper dependency management🔗 Analysis chain
Verify Graphiti API usage.
Let's verify that we're using the correct Graphiti API methods and that all required dependencies are properly specified in the project's requirements.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
Length of output: 609
Script:
Length of output: 1097