diff --git a/README.md b/README.md index 486bfe48..b48711f2 100644 --- a/README.md +++ b/README.md @@ -218,6 +218,7 @@ After that, you can run the RAG test manager from your command line. --file ".data" \ --test_set "example_data/test_set.json" \ --user_id "666" \ + --params "chunk_size" "search_type" \ --metadata "example_data/metadata.json" \ --retriever_type "single_document_context" diff --git a/level_3/Readme.md b/level_3/Readme.md index 7cc0839e..3987728f 100644 --- a/level_3/Readme.md +++ b/level_3/Readme.md @@ -1,193 +1,91 @@ -## PromethAI Memory Manager +#### Docker: +Copy the .env.template to .env and fill in the variables +Specify the environment variable in the .env file to "docker" -### Description +Launch the docker image: +```docker compose up promethai_mem ``` -RAG test manager can be used via API (in progress) or via the CLI +Send the request to the API: -Make sure to run scripts/create_database.py - -After that, you can run: - -``` python test_runner.py \ - --url "https://www.ibiblio.org/ebooks/London/Call%20of%20Wild.pdf" \ - --test_set "path/to/test_set.json" \ - --user_id "666" \ - --metadata "path/to/metadata.json" ``` +curl -X POST -H "Content-Type: application/json" -d '{ + "payload": { + "user_id": "681", + "data": [".data/3ZCCCW.pdf"], + "test_set": "sample", + "params": ["chunk_size"], + "metadata": "sample", + "retriever_type": "single_document_context" + } +}' http://0.0.0.0:8000/rag-test/rag_test_run + +``` +Params: +- data -> list of URLs or path to the file, located in the .data folder (pdf, docx, txt, html) +- test_set -> sample, manual (list of questions and answers) +- metadata -> sample, manual (json) or version (in progress) +- params -> chunk_size, chunk_overlap, search_type (hybrid, bm25), embeddings +- retriever_type -> llm_context, single_document_context, multi_document_context, cognitive_architecture(coming soon) -#How to start +Inspect the results in the DB: -## Installation +``` docker exec -it postgres psql -U bla ``` -```docker compose build promethai_mem ``` +``` \c bubu ``` -## Run the level 3 +``` select * from test_outputs; ``` -Make sure you have Docker, Poetry, and Python 3.11 installed and postgres installed. +Or set up the superset to visualize the results: -Copy the .env.example to .env and fill the variables -Start the docker: +#### Poetry environment: -```docker compose up promethai_mem ``` + +Copy the .env.template to .env and fill in the variables +Specify the environment variable in the .env file to "local" Use the poetry environment: ``` poetry shell ``` -Make sure to run to initialize DB tables - -``` python scripts/create_database.py ``` - -After that, you can run the RAG test manager. +Change the .env file Environment variable to "local" +Launch the postgres DB -``` - python rag_test_manager.py \ - --url "https://www.ibiblio.org/ebooks/London/Call%20of%20Wild.pdf" \ - --test_set "example_data/test_set.json" \ - --user_id "666" \ - --metadata "example_data/metadata.json" +``` docker compose up postgres ``` -``` -Examples of metadata structure and test set are in the folder "example_data" +Launch the superset -To analyze your data, go to your local Superset instance: +``` docker compose up superset ``` -``` - http://localhost:8088 -``` +Open the superset in your browser +``` http://localhost:8088 ``` Add the Postgres datasource to the Superset with the following connection string: -``` - postgres://bla:bla@postgres:5432/bubu -``` - - -## Clean database - -```docker compose down promethai_mem ``` - +``` postgres://bla:bla@postgres:5432/bubu ``` -```docker volume prune ``` - -``` docker compose up --force-recreate --build promethai_mem ``` - - -## Usage - -The fast API endpoint accepts prompts and stores data with the help of the Memory Manager - -The types of memory are: Episodic, Semantic, Buffer - -Endpoint Overview -The Memory API provides the following endpoints: - -- /[memory_type]/add-memory (POST) -- /[memory_type]/fetch-memory (POST) -- /[memory_type]/delete-memory (POST) -- /available-buffer-actions (GET) -- /run-buffer (POST) -- /buffer/create-context (POST) +Make sure to run to initialize DB tables +``` python scripts/create_database.py ``` +After that, you can run the RAG test manager from your command line. -## How To Get Started -1. We do a post request to add-memory endpoint with the following payload: -It will upload Jack London "Call of the Wild" to SEMANTIC memory -``` -curl -X POST http://localhost:8000/semantic/add-memory -H "Content-Type: application/json" -d '{ - "payload": { - "user_id": "681", - "prompt": "I am adding docs", - "params": { - "version": "1.0", - "agreement_id": "AG123456", - "privacy_policy": "https://example.com/privacy", - "terms_of_service": "https://example.com/terms", - "format": "json", - "schema_version": "1.1", - "checksum": "a1b2c3d4e5f6", - "owner": "John Doe", - "license": "MIT", - "validity_start": "2023-08-01", - "validity_end": "2024-07-31" - }, - "loader_settings": { - "format": "PDF", - "source": "url", - "path": "https://www.ibiblio.org/ebooks/London/Call%20of%20Wild.pdf" - } - } -}' -``` - -2. We run the buffer with the prompt "I want to know how does Buck adapt to life in the wild and then have that info translated to German " +``` + python rag_test_manager.py \ + --file ".data" \ + --test_set "example_data/test_set.json" \ + --user_id "666" \ + --params "chunk_size" "search_type" \ + --metadata "example_data/metadata.json" \ + --retriever_type "single_document_context" ``` -curl -X POST http://localhost:8000/run-buffer -H "Content-Type: application/json" -d '{ - "payload": { - "user_id": "681", - "prompt": "I want to know how does Buck adapt to life in the wild and then have that info translated to German ", - "params": { - "version": "1.0", - "agreement_id": "AG123456", - "privacy_policy": "https://example.com/privacy", - "terms_of_service": "https://example.com/terms", - "format": "json", - "schema_version": "1.1", - "checksum": "a1b2c3d4e5f6", - "owner": "John Doe", - "license": "MIT", - "validity_start": "2023-08-01", - "validity_end": "2024-07-31" - }, - "attention_modulators": { - "relevance": 0.0, - "saliency": 0.1 - } - } -}' -``` - -Other attention modulators that could be implemented: - - "frequency": 0.5, - "repetition": 0.5, - "length": 0.5, - "position": 0.5, - "context": 0.5, - "emotion": 0.5, - "sentiment": 0.5, - "perspective": 0.5, - "style": 0.5, - "grammar": 0.5, - "spelling": 0.5, - "logic": 0.5, - "coherence": 0.5, - "cohesion": 0.5, - "plausibility": 0.5, - "consistency": 0.5, - "informativeness": 0.5, - "specificity": 0.5, - "detail": 0.5, - "accuracy": 0.5, - "topicality": 0.5, - "focus": 0.5, - "clarity": 0.5, - "simplicity": 0.5, - "naturalness": 0.5, - "fluency": 0.5, - "variety": 0.5, - "vividness": 0.5, - "originality": 0.5, - "creativity": 0.5, - "humor": 0.5, \ No newline at end of file +Examples of metadata structure and test set are in the folder "example_data" diff --git a/level_3/rag_test_manager.py b/level_3/rag_test_manager.py index f9f9f27a..43f8bd6d 100644 --- a/level_3/rag_test_manager.py +++ b/level_3/rag_test_manager.py @@ -373,19 +373,7 @@ def count_files_in_data_folder(data_folder_path=".data"): except Exception as e: print(f"An error occurred: {str(e)}") return -1 # Return -1 to indicate an error -# def data_format_route(data_string: str): -# @ai_classifier -# class FormatRoute(Enum): -# """Represents classifier for the data format""" -# -# PDF = "PDF" -# UNSTRUCTURED_WEB = "UNSTRUCTURED_WEB" -# GITHUB = "GITHUB" -# TEXT = "TEXT" -# CSV = "CSV" -# WIKIPEDIA = "WIKIPEDIA" -# -# return FormatRoute(data_string).name + def data_format_route(data_string: str): @@ -465,6 +453,8 @@ async def start_test( test_set=None, user_id=None, params=None, + param_ranges=None, + param_increments=None, metadata=None, generate_test_set=False, retriever_type: str = None, @@ -506,7 +496,8 @@ async def start_test( logging.info( "Data location is %s", data_location ) - test_params = generate_param_variants(included_params=params) + logging.info("Provided params are %s", str(params)) + test_params = generate_param_variants(included_params=params, increments=param_increments, ranges=param_ranges) logging.info("Here are the test params %s", str(test_params)) @@ -515,201 +506,201 @@ async def start_test( "source": f"{data_location}", "path": data, } - if job_id is None: - job_id = str(uuid.uuid4()) - - await add_entity( - session, - Operation( - id=job_id, - user_id=user_id, - operation_params=str(test_params), - number_of_files=count_files_in_data_folder(), - operation_status = "RUNNING", - operation_type=retriever_type, - test_set_id=test_set_id, - ), - ) - doc_names = get_document_names(data) - for doc in doc_names: - - await add_entity( - session, - DocsModel( - id=str(uuid.uuid4()), - operation_id=job_id, - doc_name = doc - ) - ) - - async def run_test( - test, loader_settings, metadata, test_id=None, retriever_type=False - ): - if test_id is None: - test_id = str(generate_letter_uuid()) + "_" + "SEMANTICMEMORY" - await memory.manage_memory_attributes(existing_user) - test_class = test_id + "_class" - await memory.add_dynamic_memory_class(test_id.lower(), test_id) - dynamic_memory_class = getattr(memory, test_class.lower(), None) - methods_to_add = ["add_memories", "fetch_memories", "delete_memories"] - - if dynamic_memory_class is not None: - for method_name in methods_to_add: - await memory.add_method_to_class(dynamic_memory_class, method_name) - print(f"Memory method {method_name} has been added") - else: - print(f"No attribute named {test_class.lower()} in memory.") - - print(f"Trying to access: {test_class.lower()}") - print("Available memory classes:", await memory.list_memory_classes()) - if test: - loader_settings.update(test) - # Check if the search_type is 'none' - if loader_settings.get('search_type') == 'none': - # Change it to 'hybrid' - loader_settings['search_type'] = 'hybrid' - - test_class = test_id + "_class" - dynamic_memory_class = getattr(memory, test_class.lower(), None) - - async def run_load_test_element( - loader_settings=loader_settings, - metadata=metadata, - test_id=test_id, - test_set=test_set, - ): - print(f"Trying to access: {test_class.lower()}") - await memory.dynamic_method_call( - dynamic_memory_class, - "add_memories", - observation="Observation loaded", - params=metadata, - loader_settings=loader_settings, - ) - return "Loaded test element" - - async def run_search_element(test_item, test_id, search_type="text"): - retrieve_action = await memory.dynamic_method_call( - dynamic_memory_class, - "fetch_memories", - observation=str(test_item["question"]), search_type=loader_settings.get('search_type'), - ) - print( - "Here is the test result", - str(retrieve_action), - ) - if loader_settings.get('search_type') == 'bm25': - return retrieve_action["data"]["Get"][test_id] - else: - return retrieve_action["data"]["Get"][test_id][0]["text"] - - async def run_eval(test_item, search_result): - logging.info("Initiated test set evaluation") - test_eval = await eval_test( - query=str(test_item["question"]), - expected_output=str(test_item["answer"]), - context=str(search_result), - ) - logging.info("Successfully evaluated test set") - return test_eval - - async def run_generate_test_set(test_id): - test_class = test_id + "_class" - # await memory.add_dynamic_memory_class(test_id.lower(), test_id) - dynamic_memory_class = getattr(memory, test_class.lower(), None) - print(dynamic_memory_class) - retrieve_action = await memory.dynamic_method_call( - dynamic_memory_class, - "fetch_memories", - observation="Generate a short summary of this document", - search_type="generative", - ) - return dynamic_test_manager(retrieve_action) - - test_eval_pipeline = [] - if retriever_type == "llm_context": - for test_qa in test_set: - context = "" - logging.info("Loading and evaluating test set for LLM context") - test_result = await run_eval(test_qa, context) - test_eval_pipeline.append(test_result) - elif retriever_type == "single_document_context": - if test_set: - logging.info( - "Loading and evaluating test set for a single document context" - ) - await run_load_test_element( - loader_settings, metadata, test_id, test_set - ) - for test_qa in test_set: - result = await run_search_element(test_qa, test_id) - test_result = await run_eval(test_qa, result) - test_result.append(test) - test_eval_pipeline.append(test_result) - await memory.dynamic_method_call( - dynamic_memory_class, "delete_memories", namespace=test_id - ) - else: - pass - if generate_test_set is True: - synthetic_test_set = run_generate_test_set(test_id) - else: - pass - - return test_id, test_eval_pipeline - - results = [] - - logging.info("Validating the retriever type") - - logging.info("Retriever type: %s", retriever_type) - - if retriever_type == "llm_context": - logging.info("Retriever type: llm_context") - test_id, result = await run_test( - test=None, - loader_settings=loader_settings, - metadata=metadata, - retriever_type=retriever_type, - ) # No params for this case - results.append([result, "No params"]) - - elif retriever_type == "single_document_context": - logging.info("Retriever type: single document context") - for param in test_params: - logging.info("Running for chunk size %s", param["chunk_size"]) - test_id, result = await run_test( - param, loader_settings, metadata, retriever_type=retriever_type - ) # Add the params to the result - # result.append(param) - results.append(result) - - for b in results: - logging.info("Loading %s", str(b)) - for result, chunk in b: - logging.info("Loading %s", str(result)) - await add_entity( - session, - TestOutput( - id=test_id, - test_set_id=test_set_id, - operation_id=job_id, - set_id=str(uuid.uuid4()), - user_id=user_id, - test_results=result["success"], - test_score=str(result["score"]), - test_metric_name=result["metric_name"], - test_query=result["query"], - test_output=result["output"], - test_expected_output=str(["expected_output"]), - test_context=result["context"][0], - test_params=str(chunk), # Add params to the database table - ), - ) - - await update_entity(session, Operation, job_id, "COMPLETED") - - return results + # if job_id is None: + # job_id = str(uuid.uuid4()) + # + # await add_entity( + # session, + # Operation( + # id=job_id, + # user_id=user_id, + # operation_params=str(test_params), + # number_of_files=count_files_in_data_folder(), + # operation_status = "RUNNING", + # operation_type=retriever_type, + # test_set_id=test_set_id, + # ), + # ) + # doc_names = get_document_names(data) + # for doc in doc_names: + # + # await add_entity( + # session, + # DocsModel( + # id=str(uuid.uuid4()), + # operation_id=job_id, + # doc_name = doc + # ) + # ) + # + # async def run_test( + # test, loader_settings, metadata, test_id=None, retriever_type=False + # ): + # if test_id is None: + # test_id = str(generate_letter_uuid()) + "_" + "SEMANTICMEMORY" + # await memory.manage_memory_attributes(existing_user) + # test_class = test_id + "_class" + # await memory.add_dynamic_memory_class(test_id.lower(), test_id) + # dynamic_memory_class = getattr(memory, test_class.lower(), None) + # methods_to_add = ["add_memories", "fetch_memories", "delete_memories"] + # + # if dynamic_memory_class is not None: + # for method_name in methods_to_add: + # await memory.add_method_to_class(dynamic_memory_class, method_name) + # print(f"Memory method {method_name} has been added") + # else: + # print(f"No attribute named {test_class.lower()} in memory.") + # + # print(f"Trying to access: {test_class.lower()}") + # print("Available memory classes:", await memory.list_memory_classes()) + # if test: + # loader_settings.update(test) + # # Check if the search_type is 'none' + # if loader_settings.get('search_type') == 'none': + # # Change it to 'hybrid' + # loader_settings['search_type'] = 'hybrid' + # + # test_class = test_id + "_class" + # dynamic_memory_class = getattr(memory, test_class.lower(), None) + # + # async def run_load_test_element( + # loader_settings=loader_settings, + # metadata=metadata, + # test_id=test_id, + # test_set=test_set, + # ): + # print(f"Trying to access: {test_class.lower()}") + # await memory.dynamic_method_call( + # dynamic_memory_class, + # "add_memories", + # observation="Observation loaded", + # params=metadata, + # loader_settings=loader_settings, + # ) + # return "Loaded test element" + # + # async def run_search_element(test_item, test_id, search_type="text"): + # retrieve_action = await memory.dynamic_method_call( + # dynamic_memory_class, + # "fetch_memories", + # observation=str(test_item["question"]), search_type=loader_settings.get('search_type'), + # ) + # print( + # "Here is the test result", + # str(retrieve_action), + # ) + # if loader_settings.get('search_type') == 'bm25': + # return retrieve_action["data"]["Get"][test_id] + # else: + # return retrieve_action["data"]["Get"][test_id][0]["text"] + # + # async def run_eval(test_item, search_result): + # logging.info("Initiated test set evaluation") + # test_eval = await eval_test( + # query=str(test_item["question"]), + # expected_output=str(test_item["answer"]), + # context=str(search_result), + # ) + # logging.info("Successfully evaluated test set") + # return test_eval + # + # async def run_generate_test_set(test_id): + # test_class = test_id + "_class" + # # await memory.add_dynamic_memory_class(test_id.lower(), test_id) + # dynamic_memory_class = getattr(memory, test_class.lower(), None) + # print(dynamic_memory_class) + # retrieve_action = await memory.dynamic_method_call( + # dynamic_memory_class, + # "fetch_memories", + # observation="Generate a short summary of this document", + # search_type="generative", + # ) + # return dynamic_test_manager(retrieve_action) + # + # test_eval_pipeline = [] + # if retriever_type == "llm_context": + # for test_qa in test_set: + # context = "" + # logging.info("Loading and evaluating test set for LLM context") + # test_result = await run_eval(test_qa, context) + # test_eval_pipeline.append(test_result) + # elif retriever_type == "single_document_context": + # if test_set: + # logging.info( + # "Loading and evaluating test set for a single document context" + # ) + # await run_load_test_element( + # loader_settings, metadata, test_id, test_set + # ) + # for test_qa in test_set: + # result = await run_search_element(test_qa, test_id) + # test_result = await run_eval(test_qa, result) + # test_result.append(test) + # test_eval_pipeline.append(test_result) + # await memory.dynamic_method_call( + # dynamic_memory_class, "delete_memories", namespace=test_id + # ) + # else: + # pass + # if generate_test_set is True: + # synthetic_test_set = run_generate_test_set(test_id) + # else: + # pass + # + # return test_id, test_eval_pipeline + # + # results = [] + # + # logging.info("Validating the retriever type") + # + # logging.info("Retriever type: %s", retriever_type) + # + # if retriever_type == "llm_context": + # logging.info("Retriever type: llm_context") + # test_id, result = await run_test( + # test=None, + # loader_settings=loader_settings, + # metadata=metadata, + # retriever_type=retriever_type, + # ) # No params for this case + # results.append([result, "No params"]) + # + # elif retriever_type == "single_document_context": + # logging.info("Retriever type: single document context") + # for param in test_params: + # logging.info("Running for chunk size %s", param["chunk_size"]) + # test_id, result = await run_test( + # param, loader_settings, metadata, retriever_type=retriever_type + # ) # Add the params to the result + # # result.append(param) + # results.append(result) + # + # for b in results: + # logging.info("Loading %s", str(b)) + # for result, chunk in b: + # logging.info("Loading %s", str(result)) + # await add_entity( + # session, + # TestOutput( + # id=test_id, + # test_set_id=test_set_id, + # operation_id=job_id, + # set_id=str(uuid.uuid4()), + # user_id=user_id, + # test_results=result["success"], + # test_score=str(result["score"]), + # test_metric_name=result["metric_name"], + # test_query=result["query"], + # test_output=result["output"], + # test_expected_output=str(["expected_output"]), + # test_context=result["context"][0], + # test_params=str(chunk), # Add params to the database table + # ), + # ) + # + # await update_entity(session, Operation, job_id, "COMPLETED") + # + # return results async def main(): @@ -761,7 +752,9 @@ async def main(): parser.add_argument("--file", nargs="+", required=True, help="List of file paths to test.") parser.add_argument("--test_set", required=True, help="Path to JSON file containing the test set.") parser.add_argument("--user_id", required=True, help="User ID.") - parser.add_argument("--params", help="Additional parameters in JSON format.") + parser.add_argument("--params", nargs="+", help="Additional parameters in JSON format.") + parser.add_argument("--param_ranges", required=False, help="Param ranges") + parser.add_argument("--param_increments", required=False, help="Increment values for for example chunks") parser.add_argument("--metadata", required=True, help="Path to JSON file containing metadata.") # parser.add_argument("--generate_test_set", required=False, help="Make a test set.") parser.add_argument("--retriever_type", required=False, help="Do a test only within the existing LLM context") @@ -786,18 +779,15 @@ async def main(): return if args.params: - try: - params = json.loads(args.params) - if not isinstance(params, dict): - raise TypeError("Parsed params JSON is not a dictionary.") - except json.JSONDecodeError as e: - print(f"Error parsing params: {str(e)}") - return - else: - params = None - logging.info("Args datatype is", type(args.file)) + params = args.params + if not isinstance(params, list): + raise TypeError("Parsed params JSON is not a list.") + + else: + params = None + logging.info("Args datatype is", type(args.file)) #clean up params here - await start_test(data=args.file, test_set=test_set, user_id= args.user_id, params= params, metadata =metadata, retriever_type=args.retriever_type) + await start_test(data=args.file, test_set=test_set, user_id= args.user_id, params= args.params, param_ranges=args.param_ranges, param_increments=args.param_increments, metadata =metadata, retriever_type=args.retriever_type) if __name__ == "__main__":