diff --git a/.github/workflows/update_test_file_ratings.yml b/.github/workflows/update_test_file_ratings.yml index a767b9e9df..a8b17c6333 100644 --- a/.github/workflows/update_test_file_ratings.yml +++ b/.github/workflows/update_test_file_ratings.yml @@ -5,6 +5,9 @@ on: paths: - ".github/workflows/update_test_file_ratings.yml" - "torchci/scripts/calculate_file_test_rating.py" + - "torchci/scripts/test_calculate_file_test_rating.py" + - "torchci/scripts/td_heuristic_historical_edited_files.py" + - "torchci/scripts/td_heuristic_profiling.py" - "torchci/scripts/get_merge_base_info.py" schedule: - cron: 5 11 * * * # At 11:05 UTC every day or about 4am PT @@ -46,6 +49,10 @@ jobs: - name: Generate file test ratings run: | python3 test-infra/torchci/scripts/calculate_file_test_rating.py + python3 test-infra/torchci/scripts/td_heuristic_historical_edited_files.py + # Do not run this one, it won't change + # python3 test-infra/torchci/scripts/td_heuristic_profiling.py + env: ROCKSET_API_KEY: ${{ secrets.ROCKSET_API_KEY }} @@ -76,3 +83,17 @@ jobs: user_email: "test-infra@pytorch.org" user_name: "Pytorch Test Infra" commit_message: "Updating file to test class correlations" + + - name: Push historical edited files heuristic to test-infra repository + if: github.event_name != 'pull_request' + uses: dmnemec/copy_file_to_another_repo_action@eebb594efdf52bc12e1b461988d7254322dac131 + env: + API_TOKEN_GITHUB: ${{ secrets.GITHUB_TOKEN }} + with: + source_file: "td_heuristic_historical_edited_files.json" + destination_repo: "pytorch/test-infra" + destination_folder: "stats" + destination_branch: generated-stats + user_email: "test-infra@pytorch.org" + user_name: "Pytorch Test Infra" + commit_message: "Updating TD heuristic: historical edited files" diff --git a/.gitignore b/.gitignore index 6edd7944cc..c8a5e71053 100644 --- a/.gitignore +++ b/.gitignore @@ -125,3 +125,6 @@ docs/_build/ # Pyenv .python-version + +# torchci caching utils +.torchci_python_utils_cache diff --git a/torchci/scripts/get_merge_base_info.py b/torchci/scripts/get_merge_base_info.py index dbab80d748..123b17c0dc 100644 --- a/torchci/scripts/get_merge_base_info.py +++ b/torchci/scripts/get_merge_base_info.py @@ -1,9 +1,15 @@ import subprocess from datetime import datetime -from multiprocessing import Pool from pathlib import Path from typing import List +from pathlib import Path + +from utils_td_heuristics import ( + list_past_year_shas, + run_command, +) + from rockset_utils import query_rockset, remove_from_rockset, upload_to_rockset REPO_ROOT = Path(__file__).resolve().parent.parent.parent @@ -19,6 +25,16 @@ mb.merge_base is null """ +NOT_IN_MERGE_BASES_TABLE = """ +select + shas.sha as head_sha +from + unnest(SPLIT(:shas, ',') as sha) as shas + left outer join commons.merge_bases mb on mb.sha = shas.sha +where + mb.sha is null + or mb.repo is null +""" DUP_MERGE_BASE_INFO = """ select @@ -99,18 +115,20 @@ def upload_merge_base_info(shas: List[str]) -> None: print( f"There are {len(failed_test_shas)} shas, uploading in intervals of {interval}" ) - pool = Pool(20) - errors = [] for i in range(0, len(failed_test_shas), interval): pull_shas(failed_test_shas[i : i + interval]) - errors.append( - pool.apply_async( - upload_merge_base_info, args=(failed_test_shas[i : i + interval],) + upload_merge_base_info(failed_test_shas[i : i + interval]) + + interval = 500 + main_branch_shas = list_past_year_shas() + print(f"There are {len(main_branch_shas)} shas, uploading in batches of {interval}") + for i in range(0, len(main_branch_shas), interval): + shas = [ + x["head_sha"] + for x in query_rockset( + NOT_IN_MERGE_BASES_TABLE, + {"shas": ",".join(main_branch_shas[i : i + interval])}, ) - ) - print("done pulling") - pool.close() - pool.join() - for i in errors: - if i.get() is not None: - print(i.get()) + ] + upload_merge_base_info(shas) + print(f"{i} to {i + interval} done") diff --git a/torchci/scripts/rockset_utils.py b/torchci/scripts/rockset_utils.py index 11428c44f5..267e67a962 100644 --- a/torchci/scripts/rockset_utils.py +++ b/torchci/scripts/rockset_utils.py @@ -2,7 +2,9 @@ import os from typing import Any, Dict, List, Optional -import rockset # type: ignore[import] +import rockset + +from utils import cache_json # type: ignore[import] @lru_cache @@ -13,16 +15,16 @@ def get_rockset_client(): def query_rockset( - query: str, params: Optional[Dict[str, Any]] = None + query: str, params: Optional[Dict[str, Any]] = None, use_cache: bool = False ) -> List[Dict[str, Any]]: - res: List[Dict[str, Any]] = ( - rockset.RocksetClient( - host="api.rs2.usw2.rockset.com", api_key=os.environ["ROCKSET_API_KEY"] - ) - .sql(query, params=params) - .results - ) - return res + if not use_cache: + return get_rockset_client().sql(query, params=params).results + + @cache_json + def cache_query_rockset(query, params): + return get_rockset_client().sql(query, params=params).results + + return cache_query_rockset(query, params) def upload_to_rockset( diff --git a/torchci/scripts/td_heuristic_historical_edited_files.py b/torchci/scripts/td_heuristic_historical_edited_files.py new file mode 100644 index 0000000000..96dad907cd --- /dev/null +++ b/torchci/scripts/td_heuristic_historical_edited_files.py @@ -0,0 +1,64 @@ +import json +from collections import defaultdict +from typing import Dict + +from utils_td_heuristics import ( + cache_json, + evaluate, + get_all_invoking_files, + get_filtered_failed_tests, + get_merge_bases_dict, + list_past_year_shas, + query_rockset, +) + +CHANGED_FILES_QUERY = """ +select + sha, + changed_files +from + commons.merge_bases +where + ARRAY_CONTAINS(SPLIT(:shas, ','), sha) +""" + + +@cache_json +def gen_correlation_dict() -> Dict[str, Dict[str, float]]: + shas = list_past_year_shas() + + interval = 500 + commits = [] + for i in range(0, len(shas), interval): + commits.extend( + query_rockset( + CHANGED_FILES_QUERY, + params={"shas": ",".join(shas[i : i + interval])}, + use_cache=True, + ) + ) + + invoking_files = get_all_invoking_files() + + d = defaultdict(lambda: defaultdict(float)) + for commit in commits: + changed_files = commit["changed_files"] + # Fullname of test files look like test/.py, but invoking files + # from rockset don't include the test/ or the .py extension, so remove + # those + test_files = [x[5:-3] for x in changed_files if x[5:-3] in invoking_files] + for test_file in test_files: + for file in changed_files: + d[file][test_file] += 1 / len(changed_files) + return d + + +if __name__ == "__main__": + correlation_dict = gen_correlation_dict() + merge_bases = get_merge_bases_dict() + filtered_tests = get_filtered_failed_tests() + + evaluate(filtered_tests, merge_bases, correlation_dict) + + with open("td_heuristic_historical_edited_files.json", mode="w") as file: + json.dump(correlation_dict, file, sort_keys=True, indent=2) diff --git a/torchci/scripts/td_heuristic_profiling.py b/torchci/scripts/td_heuristic_profiling.py new file mode 100644 index 0000000000..7b1a319105 --- /dev/null +++ b/torchci/scripts/td_heuristic_profiling.py @@ -0,0 +1,26 @@ +import json + +import requests +from utils_td_heuristics import evaluate, get_filtered_failed_tests, get_merge_bases_dict + + +def get_profiling_dict(): + # The dict should be generated elsewhere and this function modified to + # retrieve the data. + url = "https://raw.githubusercontent.com/pytorch/test-infra/generated-stats/stats/td_heuristic_profiling.json" + return json.loads(requests.get(url).text) + + +def main() -> None: + profiling_dict = get_profiling_dict() + merge_bases = get_merge_bases_dict() + filtered_tests = get_filtered_failed_tests() + + evaluate(filtered_tests, merge_bases, profiling_dict) + + with open("td_heuristic_profiling.json", mode="w") as file: + json.dump(profiling_dict, file, sort_keys=True, indent=2) + + +if __name__ == "__main__": + main() diff --git a/torchci/scripts/utils.py b/torchci/scripts/utils.py new file mode 100644 index 0000000000..e283da7802 --- /dev/null +++ b/torchci/scripts/utils.py @@ -0,0 +1,68 @@ +import datetime +from hashlib import sha256 +import json +import os +import pathlib +import subprocess +from typing import List, Union + + +FILE_CACHE_LIFESPAN_SECONDS = 60 * 60 * 24 # 1 day +REPO_ROOT = pathlib.Path(__file__).parent.parent.parent +CACHE_FOLDER = REPO_ROOT / ".torchci_python_utils_cache" + + +def js_beautify(obj): + # Like json.dumps with indent=2, but only at the first level. Nice for + # dictionaries of str -> really long list + import jsbeautifier + + opts = jsbeautifier.default_options() + opts.indent_size = 2 + return jsbeautifier.beautify(json.dumps(obj), opts) + + +def run_command(command: Union[str, List[str]]) -> str: + # Runs command in pytorch folder. Assumes test-infra and pytorch are in the + # same folder. + if isinstance(command, str): + command = command.split(" ") + cwd = REPO_ROOT / ".." / "pytorch" + return ( + subprocess.check_output( + command, + cwd=cwd, + ) + .decode("utf-8") + .strip() + ) + + +def cache_json(func): + # Requires that both input and output be json serializable. + # Decorator for caching function results into a file so it can be reused betwen runs. + os.makedirs(CACHE_FOLDER, exist_ok=True) + + def wrapper(*args, **kwargs): + os.makedirs(CACHE_FOLDER, exist_ok=True) + args_key = sha256(json.dumps(args).encode("utf-8")).hexdigest() + kwargs_key = sha256( + json.dumps(kwargs, sort_keys=True).encode("utf-8") + ).hexdigest() + file_name = f"{func.__name__} args={args_key} kwargs={kwargs_key}.json" + + if os.path.exists(CACHE_FOLDER / file_name): + now = datetime.datetime.now() + mtime = datetime.datetime.fromtimestamp( + (CACHE_FOLDER / file_name).stat().st_mtime + ) + diff = now - mtime + if diff.total_seconds() < FILE_CACHE_LIFESPAN_SECONDS: + return json.load(open(CACHE_FOLDER / file_name)) + + res = func(*args, **kwargs) + with open(CACHE_FOLDER / file_name, "w") as f: + f.write(json.dumps(res)) + return res + + return wrapper diff --git a/torchci/scripts/utils_td_heuristics.py b/torchci/scripts/utils_td_heuristics.py new file mode 100644 index 0000000000..75c70dd174 --- /dev/null +++ b/torchci/scripts/utils_td_heuristics.py @@ -0,0 +1,170 @@ +import csv +import json +from collections import defaultdict +from functools import lru_cache +from typing import Any, Dict, List + +import requests +from rockset_utils import query_rockset +from utils import cache_json, run_command + + +def list_past_year_shas(): + return run_command(["git", "log", "--pretty=%H", "--since='1 year'"]).splitlines() + + +def download_test_times(): + return json.loads( + requests.get( + "https://raw.githubusercontent.com/pytorch/test-infra/generated-stats/stats/test-times.json" + ).text + ) + + +@lru_cache +def get_all_invoking_files() -> List[str]: + invoking_files = """ + select + distinct invoking_file + from + commons.test_run_summary t + """ + return [ + x["invoking_file"].replace(".", "/") + for x in query_rockset(invoking_files, use_cache=True) + ] + + +@cache_json +def filter_tests(failed_tests, merge_bases): + # Remove tests that don't have a merge base or also fail on the merge base. + + tests_by_sha = defaultdict(list) + for test in failed_tests: + sha = test["head_sha"] + tests_by_sha[sha].append(test) + + not_present_on_merge_base = [] + for test in failed_tests: + sha = test["head_sha"] + if sha not in merge_bases: + # Should only happen if rockset table is unfilled, or if the sha + # doesn't exist somehow + continue + merge_base = merge_bases[sha]["merge_base"] + present_on_merge_base = False + for base_test in tests_by_sha.get(merge_base, []): + if ( + base_test["invoking_file"] == test["invoking_file"] + and base_test["name"] == test["name"] + and base_test["classname"] == test["classname"] + and base_test["file"] == test["file"] + ): + present_on_merge_base = True + break + if not present_on_merge_base: + not_present_on_merge_base.append(test) + return not_present_on_merge_base + + +def avg(l: List[float]) -> str: + if len(l) == 0: + return "N/A" + return f"{sum(l) / len(l):.2f}" + + +def med(l: List[float]) -> str: + if len(l) == 0: + return "N/A" + return f"{sorted(l)[len(l) // 2]:.2f}" + + +def evaluate( + tests: List[Dict[str, Any]], + merge_bases: Dict[str, Dict[str, Any]], + rev_mapping: Dict[str, Dict[str, float]], +) -> None: + # This function creates a file called results.csv which contains information + # about ordering of tests. It doesn't produce output that is used but is + # meant to help evaluate if the currently rating/calculation is good. + + all_invoking_files = get_all_invoking_files() + + scores = [] + score_per_file = defaultdict(list) + for test in tests: + changed_files = merge_bases[test["head_sha"]]["changed_files"] + + prediction = defaultdict(int) + for file in changed_files: + for test_file, score in rev_mapping.get(file, {}).items(): + prediction[test_file] += score + + test_files_sorted_by_score = [ + x[0] for x in sorted(prediction.items(), key=lambda x: x[1], reverse=True) + ] + invoking_file = test["invoking_file"] + position = {} + for i, file in enumerate(test_files_sorted_by_score): + position[file] = (i + 1) / len(all_invoking_files) + scores.append(position.get(invoking_file, 1)) + for file in all_invoking_files: + score_per_file[file].append((position.get(file, 1), invoking_file == file)) + + print(f"average: {avg(scores)}") + print(f"median: {med(scores)}") + print(f"within 10%: {(len([x for x in scores if x < .1]))/len(scores)}") + print(f"# of invoking files: {len(all_invoking_files)}") + + res = [] + for file, raw_scores in score_per_file.items(): + scores = [x[0] for x in raw_scores] + wrong_scores = [x[0] for x in raw_scores if not x[1]] + right_scores = [x[0] for x in raw_scores if x[1]] + res.append( + { + "file": file, + "average": avg(scores), + "median": med(scores), + "average wrong": avg(wrong_scores), + "median wrong": med(wrong_scores), + "average right": avg(right_scores), + "median right": med(right_scores), + "count": len(right_scores), + } + ) + + with open("results.csv", "w") as csvfile: + writer = csv.DictWriter(csvfile, res[0].keys()) + writer.writeheader() + writer.writerows(res) + + +@cache_json +def get_merge_bases_dict() -> Dict[str, Dict[str, Any]]: + # Returns dictionary of commit sha -> dictionary with info about that + # commit, including changes files and merge base + merge_bases_query = """ + select * from merge_bases where repo is null or repo = 'pytorch/pytorch' + """ + merge_bases_list = query_rockset(merge_bases_query) + return {s["sha"]: s for s in merge_bases_list} + + +@cache_json +def get_filtered_failed_tests() -> List[Dict[str, Any]]: + failed_tests_query = """ + SELECT + distinct REPLACE(t.invoking_file, '.', '/') as invoking_file, + t.name, + t.classname, + t.file, + j.head_sha, + FROM + commons.failed_tests_run t + join workflow_job j on t.job_id = j.id + where + t.file is not null + """ + failed_tests = query_rockset(failed_tests_query, use_cache=True) + return filter_tests(failed_tests, get_merge_bases_dict())