diff --git a/.github/workflows/test_deduplication.yml b/.github/workflows/test_deduplication.yml new file mode 100644 index 000000000..924aab130 --- /dev/null +++ b/.github/workflows/test_deduplication.yml @@ -0,0 +1,69 @@ +name: test | deduplication + +on: + workflow_dispatch: + pull_request: + branches: + - main + types: [labeled, synchronize] + + +concurrency: + group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} + cancel-in-progress: true + +env: + RUNTIME__LOG_LEVEL: ERROR + +jobs: + get_docs_changes: + name: docs changes + uses: ./.github/workflows/get_docs_changes.yml + + run_deduplication_test: + name: test + needs: get_docs_changes + if: needs.get_docs_changes.outputs.changes_outside_docs == 'true' && ${{ github.event.label.name == 'run-checks' }} + runs-on: ubuntu-latest + defaults: + run: + shell: bash + services: + postgres: + image: pgvector/pgvector:pg17 + env: + POSTGRES_USER: cognee + POSTGRES_PASSWORD: cognee + POSTGRES_DB: cognee_db + options: >- + --health-cmd pg_isready + --health-interval 10s + --health-timeout 5s + --health-retries 5 + ports: + - 5432:5432 + + steps: + - name: Check out + uses: actions/checkout@master + + - name: Setup Python + uses: actions/setup-python@v5 + with: + python-version: '3.11.x' + + - name: Install Poetry + uses: snok/install-poetry@v1.3.2 + with: + virtualenvs-create: true + virtualenvs-in-project: true + installer-parallel: true + + - name: Install dependencies + run: poetry install -E postgres --no-interaction + + - name: Run deduplication test + env: + ENV: 'dev' + LLM_API_KEY: ${{ secrets.OPENAI_API_KEY }} + run: poetry run python ./cognee/tests/test_deduplication.py diff --git a/cognee/api/v1/add/add.py b/cognee/api/v1/add/add.py index 10430ed8d..39ee01964 100644 --- a/cognee/api/v1/add/add.py +++ b/cognee/api/v1/add/add.py @@ -33,11 +33,11 @@ async def add(data: Union[BinaryIO, List[BinaryIO], str, List[str]], dataset_nam # data is text else: - file_path = save_data_to_file(data, dataset_name) + file_path = save_data_to_file(data) return await add([file_path], dataset_name) if hasattr(data, "file"): - file_path = save_data_to_file(data.file, dataset_name, filename = data.filename) + file_path = save_data_to_file(data.file, filename = data.filename) return await add([file_path], dataset_name) # data is a list of file paths or texts @@ -45,13 +45,13 @@ async def add(data: Union[BinaryIO, List[BinaryIO], str, List[str]], dataset_nam for data_item in data: if hasattr(data_item, "file"): - file_paths.append(save_data_to_file(data_item, dataset_name, filename = data_item.filename)) + file_paths.append(save_data_to_file(data_item, filename = data_item.filename)) elif isinstance(data_item, str) and ( data_item.startswith("/") or data_item.startswith("file://") ): file_paths.append(data_item) elif isinstance(data_item, str): - file_paths.append(save_data_to_file(data_item, dataset_name)) + file_paths.append(save_data_to_file(data_item)) if len(file_paths) > 0: return await add_files(file_paths, dataset_name, user) diff --git a/cognee/infrastructure/files/utils/get_file_metadata.py b/cognee/infrastructure/files/utils/get_file_metadata.py index a114ef48f..89c3d6d8e 100644 --- a/cognee/infrastructure/files/utils/get_file_metadata.py +++ b/cognee/infrastructure/files/utils/get_file_metadata.py @@ -1,5 +1,7 @@ from typing import BinaryIO, TypedDict +import hashlib from .guess_file_type import guess_file_type +from cognee.shared.utils import get_file_content_hash class FileMetadata(TypedDict): @@ -7,10 +9,14 @@ class FileMetadata(TypedDict): file_path: str mime_type: str extension: str + content_hash: str def get_file_metadata(file: BinaryIO) -> FileMetadata: """Get metadata from a file""" file.seek(0) + content_hash = get_file_content_hash(file) + file.seek(0) + file_type = guess_file_type(file) file_path = file.name @@ -21,4 +27,5 @@ def get_file_metadata(file: BinaryIO) -> FileMetadata: file_path = file_path, mime_type = file_type.mime, extension = file_type.extension, + content_hash = content_hash, ) diff --git a/cognee/modules/data/models/Data.py b/cognee/modules/data/models/Data.py index f1b033dd0..e24bc7c5c 100644 --- a/cognee/modules/data/models/Data.py +++ b/cognee/modules/data/models/Data.py @@ -1,7 +1,6 @@ from datetime import datetime, timezone from typing import List from uuid import uuid4 - from sqlalchemy import UUID, Column, DateTime, String from sqlalchemy.orm import Mapped, relationship @@ -19,6 +18,8 @@ class Data(Base): extension = Column(String) mime_type = Column(String) raw_data_location = Column(String) + owner_id = Column(UUID, index=True) + content_hash = Column(String) created_at = Column( DateTime(timezone=True), default=lambda: datetime.now(timezone.utc) ) diff --git a/cognee/modules/data/operations/write_metadata.py b/cognee/modules/data/operations/write_metadata.py index 43031cdc9..67c8c0e45 100644 --- a/cognee/modules/data/operations/write_metadata.py +++ b/cognee/modules/data/operations/write_metadata.py @@ -2,7 +2,6 @@ import json import re import warnings -from typing import Any from uuid import UUID from sqlalchemy import select from typing import Any, BinaryIO, Union diff --git a/cognee/modules/ingestion/data_types/BinaryData.py b/cognee/modules/ingestion/data_types/BinaryData.py index 6959eb15f..0606250ea 100644 --- a/cognee/modules/ingestion/data_types/BinaryData.py +++ b/cognee/modules/ingestion/data_types/BinaryData.py @@ -17,7 +17,7 @@ def __init__(self, data: BinaryIO, name: str = None): def get_identifier(self): metadata = self.get_metadata() - return self.name + "." + metadata["extension"] + return metadata["content_hash"] def get_metadata(self): self.ensure_metadata() diff --git a/cognee/modules/ingestion/identify.py b/cognee/modules/ingestion/identify.py index 745aab913..977ff3f0b 100644 --- a/cognee/modules/ingestion/identify.py +++ b/cognee/modules/ingestion/identify.py @@ -1,7 +1,11 @@ from uuid import uuid5, NAMESPACE_OID from .data_types import IngestionData -def identify(data: IngestionData) -> str: - data_id: str = data.get_identifier() +from cognee.modules.users.models import User - return uuid5(NAMESPACE_OID, data_id) + +def identify(data: IngestionData, user: User) -> str: + data_content_hash: str = data.get_identifier() + + # return UUID hash of file contents + owner id + return uuid5(NAMESPACE_OID, f"{data_content_hash}{user.id}") diff --git a/cognee/modules/ingestion/save_data_to_file.py b/cognee/modules/ingestion/save_data_to_file.py index 1bbfaec37..1af6ab0aa 100644 --- a/cognee/modules/ingestion/save_data_to_file.py +++ b/cognee/modules/ingestion/save_data_to_file.py @@ -1,25 +1,28 @@ -import string -import random +import os.path +import hashlib from typing import BinaryIO, Union from cognee.base_config import get_base_config from cognee.infrastructure.files.storage import LocalStorage from .classify import classify -def save_data_to_file(data: Union[str, BinaryIO], dataset_name: str, filename: str = None): +def save_data_to_file(data: Union[str, BinaryIO], filename: str = None): base_config = get_base_config() data_directory_path = base_config.data_root_directory classified_data = classify(data, filename) - storage_path = data_directory_path + "/" + dataset_name.replace(".", "/") + storage_path = os.path.join(data_directory_path, "data") LocalStorage.ensure_directory_exists(storage_path) file_metadata = classified_data.get_metadata() if "name" not in file_metadata or file_metadata["name"] is None: - letters = string.ascii_lowercase - random_string = "".join(random.choice(letters) for _ in range(32)) - file_metadata["name"] = "text_" + random_string + ".txt" + data_contents = classified_data.get_data().encode('utf-8') + hash_contents = hashlib.md5(data_contents).hexdigest() + file_metadata["name"] = "text_" + hash_contents + ".txt" file_name = file_metadata["name"] - LocalStorage(storage_path).store(file_name, classified_data.get_data()) + + # Don't save file if it already exists + if not os.path.isfile(os.path.join(storage_path, file_name)): + LocalStorage(storage_path).store(file_name, classified_data.get_data()) return "file://" + storage_path + "/" + file_name diff --git a/cognee/shared/exceptions/__init__.py b/cognee/shared/exceptions/__init__.py new file mode 100644 index 000000000..9b86cccab --- /dev/null +++ b/cognee/shared/exceptions/__init__.py @@ -0,0 +1,9 @@ +""" +Custom exceptions for the Cognee API. + +This module defines a set of exceptions for handling various shared utility errors +""" + +from .exceptions import ( + IngestionError, +) \ No newline at end of file diff --git a/cognee/shared/exceptions/exceptions.py b/cognee/shared/exceptions/exceptions.py new file mode 100644 index 000000000..101711398 --- /dev/null +++ b/cognee/shared/exceptions/exceptions.py @@ -0,0 +1,11 @@ +from cognee.exceptions import CogneeApiError +from fastapi import status + +class IngestionError(CogneeApiError): + def __init__( + self, + message: str = "Failed to load data.", + name: str = "IngestionError", + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + ): + super().__init__(message, name, status_code) \ No newline at end of file diff --git a/cognee/shared/utils.py b/cognee/shared/utils.py index 315e234f1..b75076e55 100644 --- a/cognee/shared/utils.py +++ b/cognee/shared/utils.py @@ -1,6 +1,9 @@ """ This module contains utility functions for the cognee. """ import os +from typing import BinaryIO, Union + import requests +import hashlib from datetime import datetime, timezone import graphistry import networkx as nx @@ -16,6 +19,8 @@ from uuid import uuid4 import pathlib +from cognee.shared.exceptions import IngestionError + # Analytics Proxy Url, currently hosted by Vercel proxy_url = "https://test.prometh.ai" @@ -70,6 +75,29 @@ def num_tokens_from_string(string: str, encoding_name: str) -> int: num_tokens = len(encoding.encode(string)) return num_tokens +def get_file_content_hash(file_obj: Union[str, BinaryIO]) -> str: + h = hashlib.md5() + + try: + if isinstance(file_obj, str): + with open(file_obj, 'rb') as file: + while True: + # Reading is buffered, so we can read smaller chunks. + chunk = file.read(h.block_size) + if not chunk: + break + h.update(chunk) + else: + while True: + # Reading is buffered, so we can read smaller chunks. + chunk = file_obj.read(h.block_size) + if not chunk: + break + h.update(chunk) + + return h.hexdigest() + except IOError as e: + raise IngestionError(message=f"Failed to load data from {file}: {e}") def trim_text_to_max_tokens(text: str, max_tokens: int, encoding_name: str) -> str: """ diff --git a/cognee/tasks/ingestion/ingest_data_with_metadata.py b/cognee/tasks/ingestion/ingest_data_with_metadata.py index 0a238b548..c6b42f482 100644 --- a/cognee/tasks/ingestion/ingest_data_with_metadata.py +++ b/cognee/tasks/ingestion/ingest_data_with_metadata.py @@ -1,10 +1,10 @@ -from typing import Any +from typing import Any, List import dlt import cognee.modules.ingestion as ingestion from cognee.infrastructure.databases.relational import get_relational_engine from cognee.modules.data.methods import create_dataset -from cognee.modules.data.operations.delete_metadata import delete_metadata +from cognee.modules.data.models.DatasetData import DatasetData from cognee.modules.users.models import User from cognee.modules.users.permissions.methods import give_permission_on_document from cognee.shared.utils import send_telemetry @@ -23,12 +23,12 @@ async def ingest_data_with_metadata(data: Any, dataset_name: str, user: User): destination = destination, ) - @dlt.resource(standalone=True, merge_key="id") - async def data_resources(file_paths: str): + @dlt.resource(standalone=True, primary_key="id", merge_key="id") + async def data_resources(file_paths: List[str], user: User): for file_path in file_paths: with open(file_path.replace("file://", ""), mode="rb") as file: classified_data = ingestion.classify(file) - data_id = ingestion.identify(classified_data) + data_id = ingestion.identify(classified_data, user) file_metadata = classified_data.get_metadata() yield { "id": data_id, @@ -36,6 +36,8 @@ async def data_resources(file_paths: str): "file_path": file_metadata["file_path"], "extension": file_metadata["extension"], "mime_type": file_metadata["mime_type"], + "content_hash": file_metadata["content_hash"], + "owner_id": str(user.id), } async def data_storing(data: Any, dataset_name: str, user: User): @@ -57,7 +59,8 @@ async def data_storing(data: Any, dataset_name: str, user: User): with open(file_path.replace("file://", ""), mode = "rb") as file: classified_data = ingestion.classify(file) - data_id = ingestion.identify(classified_data) + # data_id is the hash of file contents + owner id to avoid duplicate data + data_id = ingestion.identify(classified_data, user) file_metadata = classified_data.get_metadata() @@ -70,6 +73,7 @@ async def data_storing(data: Any, dataset_name: str, user: User): async with db_engine.get_async_session() as session: dataset = await create_dataset(dataset_name, user.id, session) + # Check to see if data should be updated data_point = ( await session.execute(select(Data).filter(Data.id == data_id)) ).scalar_one_or_none() @@ -79,6 +83,8 @@ async def data_storing(data: Any, dataset_name: str, user: User): data_point.raw_data_location = file_metadata["file_path"] data_point.extension = file_metadata["extension"] data_point.mime_type = file_metadata["mime_type"] + data_point.owner_id = user.id + data_point.content_hash = file_metadata["content_hash"] await session.merge(data_point) else: data_point = Data( @@ -86,10 +92,20 @@ async def data_storing(data: Any, dataset_name: str, user: User): name = file_metadata["name"], raw_data_location = file_metadata["file_path"], extension = file_metadata["extension"], - mime_type = file_metadata["mime_type"] + mime_type = file_metadata["mime_type"], + owner_id = user.id, + content_hash = file_metadata["content_hash"], ) + # Check if data is already in dataset + dataset_data = ( + await session.execute(select(DatasetData).filter(DatasetData.data_id == data_id, + DatasetData.dataset_id == dataset.id)) + ).scalar_one_or_none() + # If data is not present in dataset add it + if dataset_data is None: dataset.data.append(data_point) + await session.commit() await write_metadata(data_item, data_point.id, file_metadata) @@ -109,16 +125,17 @@ async def data_storing(data: Any, dataset_name: str, user: User): # To use sqlite with dlt dataset_name must be set to "main". # Sqlite doesn't support schemas run_info = pipeline.run( - data_resources(file_paths), + data_resources(file_paths, user), table_name="file_metadata", dataset_name="main", write_disposition="merge", ) else: + # Data should be stored in the same schema to allow deduplication run_info = pipeline.run( - data_resources(file_paths), + data_resources(file_paths, user), table_name="file_metadata", - dataset_name=dataset_name, + dataset_name="public", write_disposition="merge", ) diff --git a/cognee/tasks/ingestion/save_data_item_to_storage.py b/cognee/tasks/ingestion/save_data_item_to_storage.py index e2a7c8ee7..88d499e74 100644 --- a/cognee/tasks/ingestion/save_data_item_to_storage.py +++ b/cognee/tasks/ingestion/save_data_item_to_storage.py @@ -7,7 +7,7 @@ def save_data_item_to_storage(data_item: Union[BinaryIO, str], dataset_name: str # data is a file object coming from upload. if hasattr(data_item, "file"): - file_path = save_data_to_file(data_item.file, dataset_name, filename=data_item.filename) + file_path = save_data_to_file(data_item.file, filename=data_item.filename) elif isinstance(data_item, str): # data is a file path @@ -15,7 +15,7 @@ def save_data_item_to_storage(data_item: Union[BinaryIO, str], dataset_name: str file_path = data_item.replace("file://", "") # data is text else: - file_path = save_data_to_file(data_item, dataset_name) + file_path = save_data_to_file(data_item) else: raise IngestionError(message=f"Data type not supported: {type(data_item)}") diff --git a/cognee/tasks/ingestion/save_data_item_with_metadata_to_storage.py b/cognee/tasks/ingestion/save_data_item_with_metadata_to_storage.py index d758ebcd1..06dde11bd 100644 --- a/cognee/tasks/ingestion/save_data_item_with_metadata_to_storage.py +++ b/cognee/tasks/ingestion/save_data_item_with_metadata_to_storage.py @@ -17,7 +17,7 @@ async def save_data_item_with_metadata_to_storage( # data is a file object coming from upload. elif hasattr(data_item, "file"): file_path = save_data_to_file( - data_item.file, dataset_name, filename=data_item.filename + data_item.file, filename=data_item.filename ) elif isinstance(data_item, str): @@ -26,7 +26,7 @@ async def save_data_item_with_metadata_to_storage( file_path = data_item.replace("file://", "") # data is text else: - file_path = save_data_to_file(data_item, dataset_name) + file_path = save_data_to_file(data_item) else: raise IngestionError(message=f"Data type not supported: {type(data_item)}") diff --git a/cognee/tasks/ingestion/transform_data.py b/cognee/tasks/ingestion/transform_data.py index c2ea86c47..898ac6e71 100644 --- a/cognee/tasks/ingestion/transform_data.py +++ b/cognee/tasks/ingestion/transform_data.py @@ -8,11 +8,11 @@ def get_data_from_llama_index(data_point: Union[Document, ImageDocument], datase if type(data_point) == Document: file_path = data_point.metadata.get("file_path") if file_path is None: - file_path = save_data_to_file(data_point.text, dataset_name) + file_path = save_data_to_file(data_point.text) return file_path return file_path elif type(data_point) == ImageDocument: if data_point.image_path is None: - file_path = save_data_to_file(data_point.text, dataset_name) + file_path = save_data_to_file(data_point.text) return file_path return data_point.image_path \ No newline at end of file diff --git a/cognee/tests/test_data/Natural_language_processing_copy.txt b/cognee/tests/test_data/Natural_language_processing_copy.txt new file mode 100644 index 000000000..a6fad3b47 --- /dev/null +++ b/cognee/tests/test_data/Natural_language_processing_copy.txt @@ -0,0 +1,2 @@ +Natural language processing (NLP) is an interdisciplinary subfield of computer science and information retrieval. It is primarily concerned with giving computers the ability to support and manipulate human language. It involves processing natural language datasets, such as text corpora or speech corpora, using either rule-based or probabilistic (i.e. statistical and, most recently, neural network-based) machine learning approaches. The goal is a computer capable of "understanding"[citation needed] the contents of documents, including the contextual nuances of the language within them. To this end, natural language processing often borrows ideas from theoretical linguistics. The technology can then accurately extract information and insights contained in the documents as well as categorize and organize the documents themselves. +Challenges in natural language processing frequently involve speech recognition, natural-language understanding, and natural-language generation. diff --git a/cognee/tests/test_data/example.png b/cognee/tests/test_data/example.png new file mode 100644 index 000000000..4d406cafd Binary files /dev/null and b/cognee/tests/test_data/example.png differ diff --git a/cognee/tests/test_data/example_copy.png b/cognee/tests/test_data/example_copy.png new file mode 100644 index 000000000..4d406cafd Binary files /dev/null and b/cognee/tests/test_data/example_copy.png differ diff --git a/cognee/tests/test_data/text_to_speech.mp3 b/cognee/tests/test_data/text_to_speech.mp3 new file mode 100644 index 000000000..e84aea505 Binary files /dev/null and b/cognee/tests/test_data/text_to_speech.mp3 differ diff --git a/cognee/tests/test_data/text_to_speech_copy.mp3 b/cognee/tests/test_data/text_to_speech_copy.mp3 new file mode 100644 index 000000000..e84aea505 Binary files /dev/null and b/cognee/tests/test_data/text_to_speech_copy.mp3 differ diff --git a/cognee/tests/test_deduplication.py b/cognee/tests/test_deduplication.py new file mode 100644 index 000000000..467a52368 --- /dev/null +++ b/cognee/tests/test_deduplication.py @@ -0,0 +1,160 @@ +import hashlib +import os +import logging +import pathlib + +import cognee +from cognee.infrastructure.databases.relational import get_relational_engine + +logging.basicConfig(level=logging.DEBUG) + +async def test_deduplication(): + await cognee.prune.prune_data() + await cognee.prune.prune_system(metadata=True) + + relational_engine = get_relational_engine() + + dataset_name = "test_deduplication" + dataset_name2 = "test_deduplication2" + + # Test deduplication of local files + explanation_file_path = os.path.join( + pathlib.Path(__file__).parent, "test_data/Natural_language_processing.txt" + ) + explanation_file_path2 = os.path.join( + pathlib.Path(__file__).parent, "test_data/Natural_language_processing_copy.txt" + ) + await cognee.add([explanation_file_path], dataset_name) + await cognee.add([explanation_file_path2], dataset_name2) + + result = await relational_engine.get_all_data_from_table("data") + assert len(result) == 1, "More than one data entity was found." + assert result[0]["name"] == "Natural_language_processing_copy", "Result name does not match expected value." + + result = await relational_engine.get_all_data_from_table("datasets") + assert len(result) == 2, "Unexpected number of datasets found." + assert result[0]["name"] == dataset_name, "Result name does not match expected value." + assert result[1]["name"] == dataset_name2, "Result name does not match expected value." + + result = await relational_engine.get_all_data_from_table("dataset_data") + assert len(result) == 2, "Unexpected number of dataset data relationships found." + assert result[0]["data_id"] == result[1]["data_id"], "Data item is not reused between datasets." + assert result[0]["dataset_id"] != result[1]["dataset_id"], "Dataset items are not different." + + await cognee.prune.prune_data() + await cognee.prune.prune_system(metadata=True) + + # Test deduplication of text input + text = """A quantum computer is a computer that takes advantage of quantum mechanical phenomena. + At small scales, physical matter exhibits properties of both particles and waves, and quantum computing leverages this behavior, specifically quantum superposition and entanglement, using specialized hardware that supports the preparation and manipulation of quantum states. + Classical physics cannot explain the operation of these quantum devices, and a scalable quantum computer could perform some calculations exponentially faster (with respect to input size scaling) than any modern "classical" computer. In particular, a large-scale quantum computer could break widely used encryption schemes and aid physicists in performing physical simulations; however, the current state of the technology is largely experimental and impractical, with several obstacles to useful applications. Moreover, scalable quantum computers do not hold promise for many practical tasks, and for many important tasks quantum speedups are proven impossible. + The basic unit of information in quantum computing is the qubit, similar to the bit in traditional digital electronics. Unlike a classical bit, a qubit can exist in a superposition of its two "basis" states. When measuring a qubit, the result is a probabilistic output of a classical bit, therefore making quantum computers nondeterministic in general. If a quantum computer manipulates the qubit in a particular way, wave interference effects can amplify the desired measurement results. The design of quantum algorithms involves creating procedures that allow a quantum computer to perform calculations efficiently and quickly. + Physically engineering high-quality qubits has proven challenging. If a physical qubit is not sufficiently isolated from its environment, it suffers from quantum decoherence, introducing noise into calculations. Paradoxically, perfectly isolating qubits is also undesirable because quantum computations typically need to initialize qubits, perform controlled qubit interactions, and measure the resulting quantum states. Each of those operations introduces errors and suffers from noise, and such inaccuracies accumulate. + In principle, a non-quantum (classical) computer can solve the same computational problems as a quantum computer, given enough time. Quantum advantage comes in the form of time complexity rather than computability, and quantum complexity theory shows that some quantum algorithms for carefully selected tasks require exponentially fewer computational steps than the best known non-quantum algorithms. Such tasks can in theory be solved on a large-scale quantum computer whereas classical computers would not finish computations in any reasonable amount of time. However, quantum speedup is not universal or even typical across computational tasks, since basic tasks such as sorting are proven to not allow any asymptotic quantum speedup. Claims of quantum supremacy have drawn significant attention to the discipline, but are demonstrated on contrived tasks, while near-term practical use cases remain limited. + """ + + await cognee.add([text], dataset_name) + await cognee.add([text], dataset_name2) + + result = await relational_engine.get_all_data_from_table("data") + assert len(result) == 1, "More than one data entity was found." + assert hashlib.md5(text.encode('utf-8')).hexdigest() in result[0]["name"], "Content hash is not a part of file name." + + await cognee.prune.prune_data() + await cognee.prune.prune_system(metadata=True) + + # Test deduplication of image files + explanation_file_path = os.path.join( + pathlib.Path(__file__).parent, "test_data/example.png" + ) + explanation_file_path2 = os.path.join( + pathlib.Path(__file__).parent, "test_data/example_copy.png" + ) + + await cognee.add([explanation_file_path], dataset_name) + await cognee.add([explanation_file_path2], dataset_name2) + + result = await relational_engine.get_all_data_from_table("data") + assert len(result) == 1, "More than one data entity was found." + + await cognee.prune.prune_data() + await cognee.prune.prune_system(metadata=True) + + # Test deduplication of sound files + explanation_file_path = os.path.join( + pathlib.Path(__file__).parent, "test_data/text_to_speech.mp3" + ) + explanation_file_path2 = os.path.join( + pathlib.Path(__file__).parent, "test_data/text_to_speech_copy.mp3" + ) + + await cognee.add([explanation_file_path], dataset_name) + await cognee.add([explanation_file_path2], dataset_name2) + + result = await relational_engine.get_all_data_from_table("data") + assert len(result) == 1, "More than one data entity was found." + + await cognee.prune.prune_data() + await cognee.prune.prune_system(metadata=True) + + +async def test_deduplication_postgres(): + cognee.config.set_vector_db_config( + { + "vector_db_url": "", + "vector_db_key": "", + "vector_db_provider": "pgvector" + } + ) + cognee.config.set_relational_db_config( + { + "db_name": "cognee_db", + "db_host": "127.0.0.1", + "db_port": "5432", + "db_username": "cognee", + "db_password": "cognee", + "db_provider": "postgres", + } + ) + + await test_deduplication() + +async def test_deduplication_sqlite(): + cognee.config.set_vector_db_config( + { + "vector_db_url": "", + "vector_db_key": "", + "vector_db_provider": "lancedb" + } + ) + cognee.config.set_relational_db_config( + { + "db_provider": "sqlite", + } + ) + + await test_deduplication() + + +async def main(): + + data_directory_path = str( + pathlib.Path( + os.path.join(pathlib.Path(__file__).parent, ".data_storage/test_deduplication") + ).resolve() + ) + cognee.config.data_root_directory(data_directory_path) + cognee_directory_path = str( + pathlib.Path( + os.path.join(pathlib.Path(__file__).parent, ".cognee_system/test_deduplication") + ).resolve() + ) + cognee.config.system_root_directory(cognee_directory_path) + + await test_deduplication_postgres() + await test_deduplication_sqlite() + +if __name__ == "__main__": + import asyncio + + asyncio.run(main())