diff --git a/level_3/rag_test_manager.py b/level_3/rag_test_manager.py index 43f8bd6d..2eb9dd9d 100644 --- a/level_3/rag_test_manager.py +++ b/level_3/rag_test_manager.py @@ -506,201 +506,201 @@ async def start_test( "source": f"{data_location}", "path": data, } - # if job_id is None: - # job_id = str(uuid.uuid4()) - # - # await add_entity( - # session, - # Operation( - # id=job_id, - # user_id=user_id, - # operation_params=str(test_params), - # number_of_files=count_files_in_data_folder(), - # operation_status = "RUNNING", - # operation_type=retriever_type, - # test_set_id=test_set_id, - # ), - # ) - # doc_names = get_document_names(data) - # for doc in doc_names: - # - # await add_entity( - # session, - # DocsModel( - # id=str(uuid.uuid4()), - # operation_id=job_id, - # doc_name = doc - # ) - # ) - # - # async def run_test( - # test, loader_settings, metadata, test_id=None, retriever_type=False - # ): - # if test_id is None: - # test_id = str(generate_letter_uuid()) + "_" + "SEMANTICMEMORY" - # await memory.manage_memory_attributes(existing_user) - # test_class = test_id + "_class" - # await memory.add_dynamic_memory_class(test_id.lower(), test_id) - # dynamic_memory_class = getattr(memory, test_class.lower(), None) - # methods_to_add = ["add_memories", "fetch_memories", "delete_memories"] - # - # if dynamic_memory_class is not None: - # for method_name in methods_to_add: - # await memory.add_method_to_class(dynamic_memory_class, method_name) - # print(f"Memory method {method_name} has been added") - # else: - # print(f"No attribute named {test_class.lower()} in memory.") - # - # print(f"Trying to access: {test_class.lower()}") - # print("Available memory classes:", await memory.list_memory_classes()) - # if test: - # loader_settings.update(test) - # # Check if the search_type is 'none' - # if loader_settings.get('search_type') == 'none': - # # Change it to 'hybrid' - # loader_settings['search_type'] = 'hybrid' - # - # test_class = test_id + "_class" - # dynamic_memory_class = getattr(memory, test_class.lower(), None) - # - # async def run_load_test_element( - # loader_settings=loader_settings, - # metadata=metadata, - # test_id=test_id, - # test_set=test_set, - # ): - # print(f"Trying to access: {test_class.lower()}") - # await memory.dynamic_method_call( - # dynamic_memory_class, - # "add_memories", - # observation="Observation loaded", - # params=metadata, - # loader_settings=loader_settings, - # ) - # return "Loaded test element" - # - # async def run_search_element(test_item, test_id, search_type="text"): - # retrieve_action = await memory.dynamic_method_call( - # dynamic_memory_class, - # "fetch_memories", - # observation=str(test_item["question"]), search_type=loader_settings.get('search_type'), - # ) - # print( - # "Here is the test result", - # str(retrieve_action), - # ) - # if loader_settings.get('search_type') == 'bm25': - # return retrieve_action["data"]["Get"][test_id] - # else: - # return retrieve_action["data"]["Get"][test_id][0]["text"] - # - # async def run_eval(test_item, search_result): - # logging.info("Initiated test set evaluation") - # test_eval = await eval_test( - # query=str(test_item["question"]), - # expected_output=str(test_item["answer"]), - # context=str(search_result), - # ) - # logging.info("Successfully evaluated test set") - # return test_eval - # - # async def run_generate_test_set(test_id): - # test_class = test_id + "_class" - # # await memory.add_dynamic_memory_class(test_id.lower(), test_id) - # dynamic_memory_class = getattr(memory, test_class.lower(), None) - # print(dynamic_memory_class) - # retrieve_action = await memory.dynamic_method_call( - # dynamic_memory_class, - # "fetch_memories", - # observation="Generate a short summary of this document", - # search_type="generative", - # ) - # return dynamic_test_manager(retrieve_action) - # - # test_eval_pipeline = [] - # if retriever_type == "llm_context": - # for test_qa in test_set: - # context = "" - # logging.info("Loading and evaluating test set for LLM context") - # test_result = await run_eval(test_qa, context) - # test_eval_pipeline.append(test_result) - # elif retriever_type == "single_document_context": - # if test_set: - # logging.info( - # "Loading and evaluating test set for a single document context" - # ) - # await run_load_test_element( - # loader_settings, metadata, test_id, test_set - # ) - # for test_qa in test_set: - # result = await run_search_element(test_qa, test_id) - # test_result = await run_eval(test_qa, result) - # test_result.append(test) - # test_eval_pipeline.append(test_result) - # await memory.dynamic_method_call( - # dynamic_memory_class, "delete_memories", namespace=test_id - # ) - # else: - # pass - # if generate_test_set is True: - # synthetic_test_set = run_generate_test_set(test_id) - # else: - # pass - # - # return test_id, test_eval_pipeline - # - # results = [] - # - # logging.info("Validating the retriever type") - # - # logging.info("Retriever type: %s", retriever_type) - # - # if retriever_type == "llm_context": - # logging.info("Retriever type: llm_context") - # test_id, result = await run_test( - # test=None, - # loader_settings=loader_settings, - # metadata=metadata, - # retriever_type=retriever_type, - # ) # No params for this case - # results.append([result, "No params"]) - # - # elif retriever_type == "single_document_context": - # logging.info("Retriever type: single document context") - # for param in test_params: - # logging.info("Running for chunk size %s", param["chunk_size"]) - # test_id, result = await run_test( - # param, loader_settings, metadata, retriever_type=retriever_type - # ) # Add the params to the result - # # result.append(param) - # results.append(result) - # - # for b in results: - # logging.info("Loading %s", str(b)) - # for result, chunk in b: - # logging.info("Loading %s", str(result)) - # await add_entity( - # session, - # TestOutput( - # id=test_id, - # test_set_id=test_set_id, - # operation_id=job_id, - # set_id=str(uuid.uuid4()), - # user_id=user_id, - # test_results=result["success"], - # test_score=str(result["score"]), - # test_metric_name=result["metric_name"], - # test_query=result["query"], - # test_output=result["output"], - # test_expected_output=str(["expected_output"]), - # test_context=result["context"][0], - # test_params=str(chunk), # Add params to the database table - # ), - # ) - # - # await update_entity(session, Operation, job_id, "COMPLETED") - # - # return results + if job_id is None: + job_id = str(uuid.uuid4()) + + await add_entity( + session, + Operation( + id=job_id, + user_id=user_id, + operation_params=str(test_params), + number_of_files=count_files_in_data_folder(), + operation_status = "RUNNING", + operation_type=retriever_type, + test_set_id=test_set_id, + ), + ) + doc_names = get_document_names(data) + for doc in doc_names: + + await add_entity( + session, + DocsModel( + id=str(uuid.uuid4()), + operation_id=job_id, + doc_name = doc + ) + ) + + async def run_test( + test, loader_settings, metadata, test_id=None, retriever_type=False + ): + if test_id is None: + test_id = str(generate_letter_uuid()) + "_" + "SEMANTICMEMORY" + await memory.manage_memory_attributes(existing_user) + test_class = test_id + "_class" + await memory.add_dynamic_memory_class(test_id.lower(), test_id) + dynamic_memory_class = getattr(memory, test_class.lower(), None) + methods_to_add = ["add_memories", "fetch_memories", "delete_memories"] + + if dynamic_memory_class is not None: + for method_name in methods_to_add: + await memory.add_method_to_class(dynamic_memory_class, method_name) + print(f"Memory method {method_name} has been added") + else: + print(f"No attribute named {test_class.lower()} in memory.") + + print(f"Trying to access: {test_class.lower()}") + print("Available memory classes:", await memory.list_memory_classes()) + if test: + loader_settings.update(test) + # Check if the search_type is 'none' + if loader_settings.get('search_type') == 'none': + # Change it to 'hybrid' + loader_settings['search_type'] = 'hybrid' + + test_class = test_id + "_class" + dynamic_memory_class = getattr(memory, test_class.lower(), None) + + async def run_load_test_element( + loader_settings=loader_settings, + metadata=metadata, + test_id=test_id, + test_set=test_set, + ): + print(f"Trying to access: {test_class.lower()}") + await memory.dynamic_method_call( + dynamic_memory_class, + "add_memories", + observation="Observation loaded", + params=metadata, + loader_settings=loader_settings, + ) + return "Loaded test element" + + async def run_search_element(test_item, test_id, search_type="text"): + retrieve_action = await memory.dynamic_method_call( + dynamic_memory_class, + "fetch_memories", + observation=str(test_item["question"]), search_type=loader_settings.get('search_type'), + ) + print( + "Here is the test result", + str(retrieve_action), + ) + if loader_settings.get('search_type') == 'bm25': + return retrieve_action["data"]["Get"][test_id] + else: + return retrieve_action["data"]["Get"][test_id][0]["text"] + + async def run_eval(test_item, search_result): + logging.info("Initiated test set evaluation") + test_eval = await eval_test( + query=str(test_item["question"]), + expected_output=str(test_item["answer"]), + context=str(search_result), + ) + logging.info("Successfully evaluated test set") + return test_eval + + async def run_generate_test_set(test_id): + test_class = test_id + "_class" + # await memory.add_dynamic_memory_class(test_id.lower(), test_id) + dynamic_memory_class = getattr(memory, test_class.lower(), None) + print(dynamic_memory_class) + retrieve_action = await memory.dynamic_method_call( + dynamic_memory_class, + "fetch_memories", + observation="Generate a short summary of this document", + search_type="generative", + ) + return dynamic_test_manager(retrieve_action) + + test_eval_pipeline = [] + if retriever_type == "llm_context": + for test_qa in test_set: + context = "" + logging.info("Loading and evaluating test set for LLM context") + test_result = await run_eval(test_qa, context) + test_eval_pipeline.append(test_result) + elif retriever_type == "single_document_context": + if test_set: + logging.info( + "Loading and evaluating test set for a single document context" + ) + await run_load_test_element( + loader_settings, metadata, test_id, test_set + ) + for test_qa in test_set: + result = await run_search_element(test_qa, test_id) + test_result = await run_eval(test_qa, result) + test_result.append(test) + test_eval_pipeline.append(test_result) + await memory.dynamic_method_call( + dynamic_memory_class, "delete_memories", namespace=test_id + ) + else: + pass + if generate_test_set is True: + synthetic_test_set = run_generate_test_set(test_id) + else: + pass + + return test_id, test_eval_pipeline + + results = [] + + logging.info("Validating the retriever type") + + logging.info("Retriever type: %s", retriever_type) + + if retriever_type == "llm_context": + logging.info("Retriever type: llm_context") + test_id, result = await run_test( + test=None, + loader_settings=loader_settings, + metadata=metadata, + retriever_type=retriever_type, + ) # No params for this case + results.append([result, "No params"]) + + elif retriever_type == "single_document_context": + logging.info("Retriever type: single document context") + for param in test_params: + logging.info("Running for chunk size %s", param["chunk_size"]) + test_id, result = await run_test( + param, loader_settings, metadata, retriever_type=retriever_type + ) # Add the params to the result + # result.append(param) + results.append(result) + + for b in results: + logging.info("Loading %s", str(b)) + for result, chunk in b: + logging.info("Loading %s", str(result)) + await add_entity( + session, + TestOutput( + id=test_id, + test_set_id=test_set_id, + operation_id=job_id, + set_id=str(uuid.uuid4()), + user_id=user_id, + test_results=result["success"], + test_score=str(result["score"]), + test_metric_name=result["metric_name"], + test_query=result["query"], + test_output=result["output"], + test_expected_output=str(["expected_output"]), + test_context=result["context"][0], + test_params=str(chunk), # Add params to the database table + ), + ) + + await update_entity(session, Operation, job_id, "COMPLETED") + + return results async def main():