Skip to content

Commit

Permalink
Update docs, fix issue with params
Browse files Browse the repository at this point in the history
  • Loading branch information
Vasilije1990 committed Oct 30, 2023
1 parent 57ca73c commit 552a8e6
Showing 1 changed file with 195 additions and 195 deletions.
390 changes: 195 additions & 195 deletions level_3/rag_test_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -506,201 +506,201 @@ async def start_test(
"source": f"{data_location}",
"path": data,
}
# if job_id is None:
# job_id = str(uuid.uuid4())
#
# await add_entity(
# session,
# Operation(
# id=job_id,
# user_id=user_id,
# operation_params=str(test_params),
# number_of_files=count_files_in_data_folder(),
# operation_status = "RUNNING",
# operation_type=retriever_type,
# test_set_id=test_set_id,
# ),
# )
# doc_names = get_document_names(data)
# for doc in doc_names:
#
# await add_entity(
# session,
# DocsModel(
# id=str(uuid.uuid4()),
# operation_id=job_id,
# doc_name = doc
# )
# )
#
# async def run_test(
# test, loader_settings, metadata, test_id=None, retriever_type=False
# ):
# if test_id is None:
# test_id = str(generate_letter_uuid()) + "_" + "SEMANTICMEMORY"
# await memory.manage_memory_attributes(existing_user)
# test_class = test_id + "_class"
# await memory.add_dynamic_memory_class(test_id.lower(), test_id)
# dynamic_memory_class = getattr(memory, test_class.lower(), None)
# methods_to_add = ["add_memories", "fetch_memories", "delete_memories"]
#
# if dynamic_memory_class is not None:
# for method_name in methods_to_add:
# await memory.add_method_to_class(dynamic_memory_class, method_name)
# print(f"Memory method {method_name} has been added")
# else:
# print(f"No attribute named {test_class.lower()} in memory.")
#
# print(f"Trying to access: {test_class.lower()}")
# print("Available memory classes:", await memory.list_memory_classes())
# if test:
# loader_settings.update(test)
# # Check if the search_type is 'none'
# if loader_settings.get('search_type') == 'none':
# # Change it to 'hybrid'
# loader_settings['search_type'] = 'hybrid'
#
# test_class = test_id + "_class"
# dynamic_memory_class = getattr(memory, test_class.lower(), None)
#
# async def run_load_test_element(
# loader_settings=loader_settings,
# metadata=metadata,
# test_id=test_id,
# test_set=test_set,
# ):
# print(f"Trying to access: {test_class.lower()}")
# await memory.dynamic_method_call(
# dynamic_memory_class,
# "add_memories",
# observation="Observation loaded",
# params=metadata,
# loader_settings=loader_settings,
# )
# return "Loaded test element"
#
# async def run_search_element(test_item, test_id, search_type="text"):
# retrieve_action = await memory.dynamic_method_call(
# dynamic_memory_class,
# "fetch_memories",
# observation=str(test_item["question"]), search_type=loader_settings.get('search_type'),
# )
# print(
# "Here is the test result",
# str(retrieve_action),
# )
# if loader_settings.get('search_type') == 'bm25':
# return retrieve_action["data"]["Get"][test_id]
# else:
# return retrieve_action["data"]["Get"][test_id][0]["text"]
#
# async def run_eval(test_item, search_result):
# logging.info("Initiated test set evaluation")
# test_eval = await eval_test(
# query=str(test_item["question"]),
# expected_output=str(test_item["answer"]),
# context=str(search_result),
# )
# logging.info("Successfully evaluated test set")
# return test_eval
#
# async def run_generate_test_set(test_id):
# test_class = test_id + "_class"
# # await memory.add_dynamic_memory_class(test_id.lower(), test_id)
# dynamic_memory_class = getattr(memory, test_class.lower(), None)
# print(dynamic_memory_class)
# retrieve_action = await memory.dynamic_method_call(
# dynamic_memory_class,
# "fetch_memories",
# observation="Generate a short summary of this document",
# search_type="generative",
# )
# return dynamic_test_manager(retrieve_action)
#
# test_eval_pipeline = []
# if retriever_type == "llm_context":
# for test_qa in test_set:
# context = ""
# logging.info("Loading and evaluating test set for LLM context")
# test_result = await run_eval(test_qa, context)
# test_eval_pipeline.append(test_result)
# elif retriever_type == "single_document_context":
# if test_set:
# logging.info(
# "Loading and evaluating test set for a single document context"
# )
# await run_load_test_element(
# loader_settings, metadata, test_id, test_set
# )
# for test_qa in test_set:
# result = await run_search_element(test_qa, test_id)
# test_result = await run_eval(test_qa, result)
# test_result.append(test)
# test_eval_pipeline.append(test_result)
# await memory.dynamic_method_call(
# dynamic_memory_class, "delete_memories", namespace=test_id
# )
# else:
# pass
# if generate_test_set is True:
# synthetic_test_set = run_generate_test_set(test_id)
# else:
# pass
#
# return test_id, test_eval_pipeline
#
# results = []
#
# logging.info("Validating the retriever type")
#
# logging.info("Retriever type: %s", retriever_type)
#
# if retriever_type == "llm_context":
# logging.info("Retriever type: llm_context")
# test_id, result = await run_test(
# test=None,
# loader_settings=loader_settings,
# metadata=metadata,
# retriever_type=retriever_type,
# ) # No params for this case
# results.append([result, "No params"])
#
# elif retriever_type == "single_document_context":
# logging.info("Retriever type: single document context")
# for param in test_params:
# logging.info("Running for chunk size %s", param["chunk_size"])
# test_id, result = await run_test(
# param, loader_settings, metadata, retriever_type=retriever_type
# ) # Add the params to the result
# # result.append(param)
# results.append(result)
#
# for b in results:
# logging.info("Loading %s", str(b))
# for result, chunk in b:
# logging.info("Loading %s", str(result))
# await add_entity(
# session,
# TestOutput(
# id=test_id,
# test_set_id=test_set_id,
# operation_id=job_id,
# set_id=str(uuid.uuid4()),
# user_id=user_id,
# test_results=result["success"],
# test_score=str(result["score"]),
# test_metric_name=result["metric_name"],
# test_query=result["query"],
# test_output=result["output"],
# test_expected_output=str(["expected_output"]),
# test_context=result["context"][0],
# test_params=str(chunk), # Add params to the database table
# ),
# )
#
# await update_entity(session, Operation, job_id, "COMPLETED")
#
# return results
if job_id is None:
job_id = str(uuid.uuid4())

await add_entity(
session,
Operation(
id=job_id,
user_id=user_id,
operation_params=str(test_params),
number_of_files=count_files_in_data_folder(),
operation_status = "RUNNING",
operation_type=retriever_type,
test_set_id=test_set_id,
),
)
doc_names = get_document_names(data)
for doc in doc_names:

await add_entity(
session,
DocsModel(
id=str(uuid.uuid4()),
operation_id=job_id,
doc_name = doc
)
)

async def run_test(
test, loader_settings, metadata, test_id=None, retriever_type=False
):
if test_id is None:
test_id = str(generate_letter_uuid()) + "_" + "SEMANTICMEMORY"
await memory.manage_memory_attributes(existing_user)
test_class = test_id + "_class"
await memory.add_dynamic_memory_class(test_id.lower(), test_id)
dynamic_memory_class = getattr(memory, test_class.lower(), None)
methods_to_add = ["add_memories", "fetch_memories", "delete_memories"]

if dynamic_memory_class is not None:
for method_name in methods_to_add:
await memory.add_method_to_class(dynamic_memory_class, method_name)
print(f"Memory method {method_name} has been added")
else:
print(f"No attribute named {test_class.lower()} in memory.")

print(f"Trying to access: {test_class.lower()}")
print("Available memory classes:", await memory.list_memory_classes())
if test:
loader_settings.update(test)
# Check if the search_type is 'none'
if loader_settings.get('search_type') == 'none':
# Change it to 'hybrid'
loader_settings['search_type'] = 'hybrid'

test_class = test_id + "_class"
dynamic_memory_class = getattr(memory, test_class.lower(), None)

async def run_load_test_element(
loader_settings=loader_settings,
metadata=metadata,
test_id=test_id,
test_set=test_set,
):
print(f"Trying to access: {test_class.lower()}")
await memory.dynamic_method_call(
dynamic_memory_class,
"add_memories",
observation="Observation loaded",
params=metadata,
loader_settings=loader_settings,
)
return "Loaded test element"

async def run_search_element(test_item, test_id, search_type="text"):
retrieve_action = await memory.dynamic_method_call(
dynamic_memory_class,
"fetch_memories",
observation=str(test_item["question"]), search_type=loader_settings.get('search_type'),
)
print(
"Here is the test result",
str(retrieve_action),
)
if loader_settings.get('search_type') == 'bm25':
return retrieve_action["data"]["Get"][test_id]
else:
return retrieve_action["data"]["Get"][test_id][0]["text"]

async def run_eval(test_item, search_result):
logging.info("Initiated test set evaluation")
test_eval = await eval_test(
query=str(test_item["question"]),
expected_output=str(test_item["answer"]),
context=str(search_result),
)
logging.info("Successfully evaluated test set")
return test_eval

async def run_generate_test_set(test_id):
test_class = test_id + "_class"
# await memory.add_dynamic_memory_class(test_id.lower(), test_id)
dynamic_memory_class = getattr(memory, test_class.lower(), None)
print(dynamic_memory_class)
retrieve_action = await memory.dynamic_method_call(
dynamic_memory_class,
"fetch_memories",
observation="Generate a short summary of this document",
search_type="generative",
)
return dynamic_test_manager(retrieve_action)

test_eval_pipeline = []
if retriever_type == "llm_context":
for test_qa in test_set:
context = ""
logging.info("Loading and evaluating test set for LLM context")
test_result = await run_eval(test_qa, context)
test_eval_pipeline.append(test_result)
elif retriever_type == "single_document_context":
if test_set:
logging.info(
"Loading and evaluating test set for a single document context"
)
await run_load_test_element(
loader_settings, metadata, test_id, test_set
)
for test_qa in test_set:
result = await run_search_element(test_qa, test_id)
test_result = await run_eval(test_qa, result)
test_result.append(test)
test_eval_pipeline.append(test_result)
await memory.dynamic_method_call(
dynamic_memory_class, "delete_memories", namespace=test_id
)
else:
pass
if generate_test_set is True:
synthetic_test_set = run_generate_test_set(test_id)
else:
pass

return test_id, test_eval_pipeline

results = []

logging.info("Validating the retriever type")

logging.info("Retriever type: %s", retriever_type)

if retriever_type == "llm_context":
logging.info("Retriever type: llm_context")
test_id, result = await run_test(
test=None,
loader_settings=loader_settings,
metadata=metadata,
retriever_type=retriever_type,
) # No params for this case
results.append([result, "No params"])

elif retriever_type == "single_document_context":
logging.info("Retriever type: single document context")
for param in test_params:
logging.info("Running for chunk size %s", param["chunk_size"])
test_id, result = await run_test(
param, loader_settings, metadata, retriever_type=retriever_type
) # Add the params to the result
# result.append(param)
results.append(result)

for b in results:
logging.info("Loading %s", str(b))
for result, chunk in b:
logging.info("Loading %s", str(result))
await add_entity(
session,
TestOutput(
id=test_id,
test_set_id=test_set_id,
operation_id=job_id,
set_id=str(uuid.uuid4()),
user_id=user_id,
test_results=result["success"],
test_score=str(result["score"]),
test_metric_name=result["metric_name"],
test_query=result["query"],
test_output=result["output"],
test_expected_output=str(["expected_output"]),
test_context=result["context"][0],
test_params=str(chunk), # Add params to the database table
),
)

await update_entity(session, Operation, job_id, "COMPLETED")

return results


async def main():
Expand Down

0 comments on commit 552a8e6

Please sign in to comment.