diff --git a/benchmarks/_template/requirements.cpu.txt b/benchmarks/_template/requirements.cpu.txt new file mode 100644 index 000000000..e0058b822 --- /dev/null +++ b/benchmarks/_template/requirements.cpu.txt @@ -0,0 +1,46 @@ +# +# This file is autogenerated by pip-compile with Python 3.10 +# by the following command: +# +# pip-compile --output-file=benchmarks/_template/requirements.cpu.txt benchmarks/_template/requirements.in +# +antlr4-python3-runtime==4.9.3 + # via omegaconf +asttokens==2.4.1 + # via giving +codefind==0.1.3 + # via ptera +executing==1.2.0 + # via varname +giving==0.4.2 + # via + # ptera + # voir +markdown-it-py==3.0.0 + # via rich +mdurl==0.1.2 + # via markdown-it-py +omegaconf==2.3.0 + # via voir +ovld==0.3.2 + # via voir +ptera==1.4.1 + # via voir +pygments==2.17.2 + # via rich +pynvml==11.5.0 + # via voir +pyyaml==6.0.1 + # via omegaconf +reactivex==4.0.4 + # via giving +rich==13.7.0 + # via voir +six==1.16.0 + # via asttokens +typing-extensions==4.10.0 + # via reactivex +varname==0.10.0 + # via giving +voir==0.2.12 + # via -r benchmarks/_template/requirements.in diff --git a/config/examples/ec2-system.yaml b/config/examples/ec2-system.yaml new file mode 100644 index 000000000..a81c09bfc --- /dev/null +++ b/config/examples/ec2-system.yaml @@ -0,0 +1,20 @@ +system: + # Nodes list + nodes: + # Alias used to reference the node + - name: manager + # Use 1.1.1.1 as an ip placeholder + ip: 1.1.1.1 + # Use this node as the master node or not + main: true + # User to use in remote milabench operations + user: user + + # Cloud instances profiles + cloud_profiles: + ec2: + profile: mb_test_sog_3 + username: ubuntu + instance_type: t2.micro + volume_size: 8 + region: us-east-2 diff --git a/config/test.yaml b/config/test.yaml new file mode 100644 index 000000000..060949e40 --- /dev/null +++ b/config/test.yaml @@ -0,0 +1,24 @@ +_defaults: + max_duration: 600 + voir: + options: + stop: 60 + interval: "1s" + +test: + inherits: _defaults + group: test_remote + install_group: test_remote + definition: ../benchmarks/_template + plan: + method: njobs + n: 1 + +testing: + inherits: _defaults + definition: ../benchmarks/_template + group: test_remote_2 + install_group: test_remote_2 + plan: + method: njobs + n: 1 diff --git a/milabench/__init__.py b/milabench/__init__.py index e69de29bb..ac33e6bb3 100644 --- a/milabench/__init__.py +++ b/milabench/__init__.py @@ -0,0 +1,5 @@ +import pathlib + +ROOT_FOLDER = pathlib.Path(__file__).resolve().parent.parent +CONFIG_FOLDER = ROOT_FOLDER / "config" +BENCHMARK_FOLDER = ROOT_FOLDER / "benchmarks" diff --git a/milabench/cli/__init__.py b/milabench/cli/__init__.py index f0eea8d1e..c5de9b0b8 100644 --- a/milabench/cli/__init__.py +++ b/milabench/cli/__init__.py @@ -3,6 +3,7 @@ from coleo import run_cli +from .cloud import cli_cloud from .compare import cli_compare from .dev import cli_dev from .install import cli_install @@ -12,6 +13,7 @@ from .pr import cli_write_report_to_pr from .prepare import cli_prepare from .publish import cli_publish +from .purge_cloud import cli_purge_cloud from .report import cli_report from .run import cli_run from .schedule import cli_schedule @@ -37,6 +39,14 @@ def pin(): """Pin the benchmarks' dependencies.""" cli_pin() + def cloud(): + """Setup cloud instances.""" + cli_cloud() + + def purge_cloud(): + """Purge running cloud instannces the benchmarks' dependencies.""" + cli_purge_cloud() + def dev(): """Create a shell in a benchmark's environment for development.""" cli_dev() diff --git a/milabench/cli/badges/__main__.py b/milabench/cli/badges/__main__.py new file mode 100644 index 000000000..027a59a4b --- /dev/null +++ b/milabench/cli/badges/__main__.py @@ -0,0 +1,45 @@ +import pathlib +import subprocess +import sys + + +def main(argv=None): + if argv is None: + argv = sys.argv[1:] + + try: + import pybadges as _ + except ImportError: + module = pathlib.Path(__file__).resolve().parent + cache_dir = pathlib.Path(f"/tmp/milabench/{module.name}_venv") + python3 = str(cache_dir / "bin/python3") + check_module = "import pybadges" + try: + subprocess.run([python3, "-c", check_module], check=True) + except (FileNotFoundError, subprocess.CalledProcessError): + cache_dir.mkdir(parents=True, exist_ok=True) + subprocess.run([sys.executable, "-m", "virtualenv", str(cache_dir)], check=True) + subprocess.run([python3, "-m", "pip", "install", "-U", "pip"], check=True) + subprocess.run([ + python3, + "-m", + "pip", + "install", + "-r", + str(module / "requirements.txt") + ], check=True) + subprocess.run([python3, "-c", check_module], check=True) + return subprocess.call( + [python3, __file__, *argv], + ) + + return subprocess.run([ + sys.executable, + "-m", + "pybadges", + *argv + ], check=True).returncode + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/milabench/cli/badges/requirements.txt b/milabench/cli/badges/requirements.txt new file mode 100644 index 000000000..26620981a --- /dev/null +++ b/milabench/cli/badges/requirements.txt @@ -0,0 +1 @@ +pybadges \ No newline at end of file diff --git a/milabench/cli/cloud.py b/milabench/cli/cloud.py new file mode 100644 index 000000000..c0f9c9bcb --- /dev/null +++ b/milabench/cli/cloud.py @@ -0,0 +1,153 @@ +from copy import deepcopy +import os +import subprocess +import sys + +from coleo import Option, tooled +import yaml + +# import milabench as mb +from ..common import get_multipack + + +_SETUP = "setup" +_TEARDOWN = "teardown" +_LIST = "list" +_ACTIONS = (_SETUP, _TEARDOWN, _LIST) + + +def manage_cloud(pack, packs, run_on, action="setup"): + assert run_on in pack.config["system"]["cloud_profiles"] + + key_map = { + "hostname":(lambda v: ("ip",v)), + "username":(lambda v: ("user",v)), + "ssh_key_file":(lambda v: ("key",v)), + "env":(lambda v: ("env",[".", v, ";", "conda", "activate", "milabench", "&&"])), + } + plan_params = deepcopy(pack.config["system"]["cloud_profiles"][run_on]) + + nodes = iter(enumerate(pack.config["system"]["nodes"])) + + state_prefix = [] + for p in packs.values(): + state_prefix.append(p.config["name"]) + state_prefix.append(p.config["install_variant"]) + + while True: + try: + i, n = next(nodes) + if n["ip"] != "1.1.1.1": + continue + except StopIteration: + break + + plan_params["state_prefix"] = plan_params.get("state_prefix", None) or "-".join([str(i), *state_prefix]) + plan_params["state_id"] = plan_params.get("state_id", None) or pack.config["hash"] + + import milabench.cli.covalent as cv + + subprocess.run( + [ + sys.executable, + "-m", cv.__name__, + "serve", "start" + ] + , stdout=sys.stderr + , check=True + ) + + cmd = [ + sys.executable, + "-m", cv.__name__, + run_on, + f"--{action}", + *[ + f"--{k.replace('_', '-')}={v}" + for k, v in plan_params.items() + ], + ] + p = subprocess.Popen( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + + stdout_chunks = [] + while True: + line = p.stdout.readline() + if not line: + break + line_str = line.decode("utf-8").strip() + stdout_chunks.append(line_str) + print(line_str, file=sys.stderr) + + if not line_str: + continue + try: + k, v = line_str.split("::>") + k, v = key_map[k](v) + if k == "ip" and n[k] != "1.1.1.1": + i, n = next(nodes) + n[k] = v + except ValueError: + pass + + _, stderr = p.communicate() + stderr = stderr.decode("utf-8").strip() + print(stderr, file=sys.stderr) + + if p.returncode != 0: + stdout = os.linesep.join(stdout_chunks) + raise subprocess.CalledProcessError( + p.returncode, + cmd, + stdout, + stderr + ) + + return pack.config["system"] + + +@tooled +def _setup(): + """Setup a cloud infrastructure""" + + # Setup cloud on target infra + run_on: Option & str + + mp = get_multipack() + setup_pack = mp.setup_pack() + system_config = manage_cloud(setup_pack, mp.packs, run_on, action=_SETUP) + + print(f"# hash::>{setup_pack.config['hash']}") + print(yaml.dump({"system": system_config})) + + +@tooled +def _teardown(): + """Teardown a cloud infrastructure""" + + # Setup cloud on target infra + run_on: Option & str + + mp = get_multipack() + setup_pack = mp.setup_pack() + manage_cloud(setup_pack, mp.packs, run_on, action=_TEARDOWN) + + +@tooled +def cli_cloud(): + """Manage cloud instances.""" + + # Setup a cloud infrastructure + setup: Option & bool = False + # Teardown a cloud infrastructure + teardown: Option & bool = False + + assert any((setup, teardown)) and not all((setup, teardown)) + + if setup: + _setup() + elif teardown: + _teardown() diff --git a/milabench/cli/covalent/__main__.py b/milabench/cli/covalent/__main__.py new file mode 100644 index 000000000..ed722d422 --- /dev/null +++ b/milabench/cli/covalent/__main__.py @@ -0,0 +1,236 @@ +import argparse +import asyncio +import os +import pathlib +import subprocess +import sys +import tempfile + + +def serve(*argv): + return subprocess.run([ + str(pathlib.Path(sys.executable).with_name("covalent")), + *argv + ]).returncode + + +def _get_executor_kwargs(args): + return { + **{k:v for k,v in vars(args).items() if k not in ("setup", "teardown")}, + **{"action":k for k,v in vars(args).items() if k in ("setup", "teardown") and v}, + } + + +def executor(executor_cls, args, *argv): + import covalent as ct + + executor:ct.executor.BaseExecutor = executor_cls( + **_get_executor_kwargs(args), + ) + + def _popen(cmd, *args, _env=None, **kwargs): + # _debug_f = open("/home/ubuntu/debug", "wt") + # print(cmd, *args, _env, kwargs, sep="\n", file=_debug_f, flush=True) + _env = _env if _env is not None else {} + + for envvar in _env.keys(): + envvar_val = _env[envvar] + + if not envvar_val: + continue + + envvar_val = pathlib.Path(envvar_val).expanduser() + if str(envvar_val) != _env[envvar]: + _env[envvar] = str(envvar_val) + + if "MILABENCH_CONFIG_CONTENT" in _env: + _config_dir = pathlib.Path(_env["MILABENCH_CONFIG"]).parent + with tempfile.NamedTemporaryFile("wt", dir=str(_config_dir), suffix=".yaml", delete=False) as _f: + _f.write(_env["MILABENCH_CONFIG_CONTENT"]) + _env["MILABENCH_CONFIG"] = _f.name + + try: + cmd = (str(pathlib.Path(cmd[0]).expanduser()), *cmd[1:]) + except IndexError: + pass + + cwd = kwargs.pop("cwd", None) + if cwd is not None: + cwd = str(pathlib.Path(cwd).expanduser()) + kwargs["cwd"] = cwd + + _env = {**os.environ.copy(), **kwargs.pop("env", {}), **_env} + + kwargs = { + **kwargs, + "env": _env, + "stdout": subprocess.PIPE, + "stderr": subprocess.PIPE, + } + # print(cmd, *args, _env, kwargs, sep="\n", file=_debug_f, flush=True) + p = subprocess.Popen(cmd, *args, **kwargs) + + stdout_chunks = [] + while True: + line = p.stdout.readline() + if not line: + break + line_str = line.decode("utf-8").strip() + stdout_chunks.append(line_str) + print(line_str) + + _, stderr = p.communicate() + stderr = stderr.decode("utf-8").strip() + stdout = os.linesep.join(stdout_chunks) + + if p.returncode != 0: + raise subprocess.CalledProcessError( + p.returncode, + (cmd, args, kwargs), + stdout, + stderr + ) + return p.returncode, stdout, stderr + + @ct.lattice + def lattice(argv=(), deps_bash = None): + return ct.electron( + _popen, + executor=executor, + deps_bash=deps_bash, + )( + argv, + ) + + return_code = 0 + try: + dispatch_id = None + result = None + deps_bash = None + + if not argv and args.setup: + conda_prefix = "eval \"$(conda shell.bash hook)\"" + conda_activate = "conda activate milabench" + deps_bash = [] + for _cmd in ( + f"{conda_activate} || conda create -n milabench -y", + f"{conda_activate}" + f" && conda install python={sys.version_info.major}.{sys.version_info.minor} virtualenv pip -y" + f" || >&2 echo First attempt to install python in milabench env failed", + f"{conda_activate}" + f" && conda install python={sys.version_info.major}.{sys.version_info.minor} virtualenv pip -y" + f" || conda remove -n milabench --all -y", + ): + deps_bash.append(f"{conda_prefix} && ({_cmd})") + deps_bash = ct.DepsBash(deps_bash) + argv = ["conda", "env", "list"] + + if argv: + dispatch_id = ct.dispatch(lattice, disable_run=False)(argv, deps_bash=deps_bash) + result = ct.get_result(dispatch_id=dispatch_id, wait=True) + return_code, stdout, _ = result.result if result.result is not None else (1, "", "") + + if return_code == 0 and args.setup: + assert any([l for l in stdout.split("\n") if l.startswith("milabench ")]) + _executor:ct.executor.BaseExecutor = executor_cls( + **{ + **_get_executor_kwargs(args), + **{"action": "teardown"}, + } + ) + asyncio.run(_executor.setup({})) + + assert _executor.hostname + print(f"hostname::>{_executor.hostname}") + print(f"username::>{_executor.username}") + print(f"ssh_key_file::>{_executor.ssh_key_file}") + print(f"env::>~/.condaenvrc") + finally: + result = ct.get_result(dispatch_id=dispatch_id, wait=False) if dispatch_id else None + results_dir = result.results_dir if result else "" + if args.teardown: + try: + _executor:ct.executor.BaseExecutor = executor_cls( + **{ + **_get_executor_kwargs(args), + **{"action": "teardown"}, + } + ) + asyncio.run(_executor.setup({})) + asyncio.run( + _executor.teardown( + {"dispatch_id": dispatch_id, "node_id": 0, "results_dir": results_dir} + ) + ) + except FileNotFoundError: + pass + + return return_code + + +def main(argv=None): + if argv is None: + argv = sys.argv[1:] + + try: + import covalent as ct + ct.get_config(f"executors.ec2") + except (KeyError, ImportError): + module = pathlib.Path(__file__).resolve().parent + cache_dir = pathlib.Path(f"/tmp/milabench/{module.name}_venv") + python3 = str(cache_dir / "bin/python3") + check_module = "import covalent ; from covalent.executor import EC2Executor" + try: + subprocess.run([python3, "-c", check_module], check=True) + except (FileNotFoundError, subprocess.CalledProcessError): + cache_dir.mkdir(parents=True, exist_ok=True) + subprocess.run([sys.executable, "-m", "virtualenv", str(cache_dir)], check=True) + subprocess.run([python3, "-m", "pip", "install", "-U", "pip"], check=True) + subprocess.run([ + python3, + "-m", + "pip", + "install", + "-r", + str(module / "requirements.txt") + ], check=True) + subprocess.run([python3, "-c", check_module], check=True) + return subprocess.call( + [python3, __file__, *argv], + ) + + parser = argparse.ArgumentParser() + subparsers = parser.add_subparsers() + subparser = subparsers.add_parser("serve") + subparser.add_argument(f"argv", nargs=argparse.REMAINDER) + for p in ("ec2",): + try: + config = ct.get_config(f"executors.{p}") + except KeyError: + continue + subparser = subparsers.add_parser(p) + subparser.add_argument(f"--setup", action="store_true") + subparser.add_argument(f"--teardown", action="store_true") + for param, default in config.items(): + if param == "action": + continue + subparser.add_argument(f"--{param.replace('_', '-')}", default=default) + + try: + cv_argv, argv = argv[:argv.index("--")], argv[argv.index("--")+1:] + except ValueError: + cv_argv, argv = argv, [] + + args = parser.parse_args(cv_argv) + + if cv_argv[0] == "serve": + assert not argv + return serve(*args.argv) + elif cv_argv[0] == "ec2": + return executor(ct.executor.EC2Executor, args, *argv) + else: + raise + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/milabench/cli/covalent/requirements.txt b/milabench/cli/covalent/requirements.txt new file mode 100644 index 000000000..f810e6eaf --- /dev/null +++ b/milabench/cli/covalent/requirements.txt @@ -0,0 +1,2 @@ +covalent +covalent-ec2-plugin @ git+https://github.com/satyaog/covalent-ec2-plugin.git@feature/milabench \ No newline at end of file diff --git a/milabench/cli/purge_cloud.py b/milabench/cli/purge_cloud.py new file mode 100644 index 000000000..d6a380d7b --- /dev/null +++ b/milabench/cli/purge_cloud.py @@ -0,0 +1,51 @@ +from dataclasses import dataclass + +from coleo import Option, tooled + +from milabench.utils import validation_layers + +from .install import arguments +from ..common import get_multipack, run_with_loggers +from ..log import DataReporter, TerminalFormatter, TextReporter + + +# fmt: off +@dataclass +class Arguments: + shorttrace: bool = False + variant: str = None +# fmt: on + + +@tooled +def arguments(): + # On error show full stacktrace + shorttrace: Option & bool = False + + # Install variant + variant: Option & str = None + + return Arguments(shorttrace, variant) + + +@tooled +def cli_purge_cloud(args=None): + """Purge running cloud instannces the benchmarks' dependencies.""" + if args is None: + args = arguments() + + overrides = {"*": {"install_variant": args.variant}} if args.variant else {} + + mp = get_multipack(overrides=overrides) + + return run_with_loggers( + mp.do_purge_cloud(), + loggers=[ + TerminalFormatter(), + TextReporter("stdout"), + TextReporter("stderr"), + DataReporter(), + *validation_layers("error", short=args.shorttrace), + ], + mp=mp, + ) diff --git a/milabench/cli/report.py b/milabench/cli/report.py index cbad44223..b14b49528 100644 --- a/milabench/cli/report.py +++ b/milabench/cli/report.py @@ -1,10 +1,12 @@ +import glob import os import sys from dataclasses import dataclass, field from coleo import Option, config as configuration, tooled -from ..common import Option, _error_report, _get_multipack, _read_reports +from ..common import Option, _error_report, _get_multipack, _push_reports, _read_reports +from ..fs import XPath from ..report import make_report from ..summary import make_summary @@ -12,12 +14,13 @@ # fmt: off @dataclass class Arguments: - runs: list = field(default_factory=list) + runs : list = field(default_factory=list) config : str = os.environ.get("MILABENCH_CONFIG", None) compare : str = None compare_gpus: bool = False html : str = None price : int = None + push : bool = False # fmt: on @@ -42,7 +45,10 @@ def arguments(): # Price per unit price: Option & int = None - return Arguments(runs, config, compare, compare_gpus, html, price) + # Push reports to repo + push: Option & bool = False + + return Arguments(runs, config, compare, compare_gpus, html, price, push) @tooled @@ -68,11 +74,6 @@ def cli_report(args=None): # ------ # 1 errors, details in HTML report. - reports = None - if args.runs: - reports = _read_reports(*args.runs) - summary = make_summary(reports.values()) - if args.config: from milabench.common import arguments as multipack_args @@ -81,6 +82,25 @@ def cli_report(args=None): args.config = _get_multipack(margs, return_config=True) + assert args.config if args.push else None + + if not args.runs and args.config: + run_dirs = {XPath(pack_config["dirs"]["runs"]) for pack_config in args.config.values()} + filter = lambda _p: not any([XPath(_p).name.startswith(f"{prefix}.") for prefix in ("install", "prepare")]) + args.runs = sorted( + {_r + for _rd in run_dirs + for _r in glob.glob(str(_rd / "*.*.*/")) + if filter(_r) + }, + key=lambda _p: XPath(_p).name.split(".")[-2:] + ) + + reports = None + if args.runs: + reports = _read_reports(*args.runs) + summary = make_summary(reports.values()) + make_report( summary, compare=args.compare, @@ -93,3 +113,10 @@ def cli_report(args=None): errdata=reports and _error_report(reports), stream=sys.stdout, ) + + if len(reports) and args.push: + reports_repo = next(iter( + XPath(pack_config["dirs"]["base"]) / "reports" + for pack_config in args.config.values() + )) + _push_reports(reports_repo, args.runs, summary) diff --git a/milabench/commands/__init__.py b/milabench/commands/__init__.py index 30ce3ffa8..00284208d 100644 --- a/milabench/commands/__init__.py +++ b/milabench/commands/__init__.py @@ -399,6 +399,11 @@ def is_local(self): == localnode["hostname"] # The hostname is the local node ) + def _load_env(self, node): + if node.get("env", None): + return node["env"] + return [] + def _argv(self, **kwargs) -> List: # No-op when executing on a local node if self.is_local(): @@ -410,13 +415,14 @@ def _argv(self, **kwargs) -> List: host = f"{user}@{self.host}" if user else self.host argv = super()._argv(**kwargs) - argv.extend(["-oPasswordAuthentication=no"]) - argv.extend(["-p", str(self.port)]) - if key: - argv.append(f"-i{key}") + # scp apparently needs `-i` to be first + argv.insert(1, f"-i{key}") + argv.append(f"-p{self.port}") argv.append(host) + argv.extend(self._load_env(node)) + return argv @@ -427,21 +433,27 @@ def __init__( self, pack: pack.BasePackage, host: str, - directory: str, + src: str, *scp_argv, + dest: str = None, user: str = None, key: str = None, **kwargs, ) -> None: super().__init__(pack, host, "-r", *scp_argv, user=user, key=key, **kwargs) - self.dir = directory + self.src = src + self.dest = dest if dest is not None else self.src + + def _load_env(self, node): + del node + return [] def _argv(self, **kwargs) -> List: argv = super()._argv(**kwargs) host = argv.pop() - argv.append(self.dir) - argv.append(f"{host}:{self.dir}") + argv.append(self.src) + argv.append(f"{host}:{self.dest}") return argv diff --git a/milabench/common.py b/milabench/common.py index 35f9cf125..d5d5d8f27 100644 --- a/milabench/common.py +++ b/milabench/common.py @@ -1,16 +1,21 @@ +from copy import deepcopy import io import json import os import re import runpy +import subprocess import sys import traceback from dataclasses import dataclass, field from datetime import datetime from coleo import Option, default, tooled +import git from omegaconf import OmegaConf from voir.instruments.gpu import deduce_backend, select_backend +import yaml +from milabench import ROOT_FOLDER from milabench.alt_async import proceed from milabench.utils import available_layers, blabla, multilogger @@ -194,6 +199,13 @@ def _get_multipack( if args.config is None: sys.exit("Error: CONFIG argument not provided and no $MILABENCH_CONFIG") + if args.system is None: + args.system = os.environ.get("MILABENCH_SYSTEM", None) + + if args.system is None: + if XPath(f"{args.config}.system").exists(): + args.system = f"{args.config}.system" + if args.select: args.select = set(args.select.split(",")) @@ -255,7 +267,7 @@ def is_selected(defn): return selected_config else: return MultiPackage( - {name: get_pack(defn) for name, defn in selected_config.items()} + {name: get_pack(deepcopy(defn)) for name, defn in selected_config.items()} ) @@ -296,6 +308,156 @@ def _read_reports(*runs): return all_data +def _find_metas(reports): + local_meta = next(iter(e for _r in reports for e in _r if e["event"] == "meta"), None) + if local_meta: + local_meta = local_meta["data"] + remote_metas = [] + for _r in reports: + meta_lines = [] + for event in _r: + _, event_type, line = None, "", [] + + try: + _, event_type, *line = event["data"].split(" ") + except (AttributeError, ValueError): + pass + + if event_type[:1] + event_type[-1:] != "[]": + event_type = None + line = event["data"] + else: + line = " ".join(line) + + if event_type == "[meta]": + meta_lines.append(line) + elif event_type is None and meta_lines: + meta_lines.append(line) + elif meta_lines: + remote_metas.append(yaml.safe_load("".join(meta_lines))) + meta_lines = [] + + return local_meta, remote_metas + + +def _filter_reports(*reports): + all_reports = [] + + for report in reports: + config = next(iter(e for e in report if e["event"] == "config"), None) + if config is None: + continue + + if config["data"]["name"] != "remote": + all_reports.append(report) + + return all_reports + + +def _push_reports(reports_repo, runs, packs:dict=None): + _SVG_COLORS = { + "pass": "blue", + "partial": "yellow", + "failure": "red", + } + import milabench.cli.badges as badges + + _repo = git.repo.base.Repo(ROOT_FOLDER) + try: + reports_repo = git.repo.base.Repo(str(reports_repo)) + except (git.exc.InvalidGitRepositoryError, git.exc.NoSuchPathError): + repo_url = next(iter(_r.url for _r in _repo.remotes if _r.name == "origin"), None) + reports_repo = git.repo.base.Repo.clone_from(repo_url, str(reports_repo), branch="reports") + + reports_url = [_r.url for _r in _repo.remotes if "mila-iqia" in _r.url][0] + reports_url = XPath(":".join(reports_url.split(":")[1:])) + reports_url = XPath("https://github.com") / f"{reports_url.with_suffix('')}/tree/{reports_repo.active_branch.name}" + + device_reports = {} + for run in runs: + reports = list(_read_reports(run).values()) + reports = _filter_reports(*reports) + + if not reports: + continue + + meta = [e["data"] for _r in reports for e in _r if e["event"] == "meta"] + + for _meta in meta: + for gpu in _meta["accelerators"]["gpus"].values(): + device = gpu["product"].replace(" ", "_") + break + else: + for _meta in meta: + device = _meta["cpu"]["brand"].replace(" ", "_") + break + + tag = [ + t.name + for t in _repo.tags + if meta[0]["milabench"]["tag"].startswith(t.name) + ][0] + reports_dir = XPath(reports_repo.working_tree_dir) / tag + + run = XPath(run) + try: + run.copy(reports_dir / device / run.name) + except FileExistsError: + pass + + device_reports.setdefault((device, tag), set()) + device_reports[(device, tag)].update( + (reports_dir / device).glob("*/") + ) + + for (device, tag), reports in device_reports.items(): + reports_dir = XPath(reports_repo.working_tree_dir) / tag + reports = _read_reports(*reports) + reports = _filter_reports(*reports.values()) + summary = make_summary(reports) + + successes = [s["successes"] for s in summary.values()] + failures = [s["failures"] for s in summary.values()] + + if sum(successes) == 0: + text = "failure" + elif any(failures): + text = "partial" + else: + text = "pass" + + result = subprocess.run( + [ + sys.executable, + "-m", badges.__name__, + "--left-text", device, + "--right-text", text, + "--right-color", _SVG_COLORS[text], + "--whole-link", str(reports_url / tag / device) + ], + capture_output=True + ) + if result.returncode == 0: + (reports_dir / device / "badge.svg").write_text(result.stdout.decode("utf8")) + + with open(str(reports_dir / device / "README.md"), "wt") as _f: + _f.write("```\n") + make_report(summary, stream=_f) + _f.write("```\n") + + for cmd, _kwargs in ( + (["git", "pull"], {"check": True}), + (["git", "add", tag], {"check": True}), + (["git", "commit", "-m", tag], {"check": False}), + (["git", "push"], {"check": True}) + ): + subprocess.run( + cmd, + cwd=reports_repo.working_tree_dir, + **_kwargs + ) + + def _error_report(reports): out = {} for r, data in reports.items(): diff --git a/milabench/config.py b/milabench/config.py index bfee806e7..e276cb17c 100644 --- a/milabench/config.py +++ b/milabench/config.py @@ -1,6 +1,8 @@ import contextvars +import hashlib import os import socket +from copy import deepcopy import psutil import yaml @@ -57,6 +59,16 @@ def resolve_inheritance(bench_config, all_configs): return bench_config +def compute_config_hash(config): + config = deepcopy(config) + for entry in config: + config[entry]["dirs"] = {} + config[entry]["config_base"] = "" + config[entry]["config_file"] = "" + config[entry]["run_name"] = "" + return hashlib.md5(str(config).encode("utf8")).hexdigest() + + def finalize_config(name, bench_config): bench_config["name"] = name if "definition" in bench_config: @@ -76,6 +88,8 @@ def build_config(*config_files): for layer in _config_layers(config_files): all_configs = merge(all_configs, layer) + all_configs["*"]["hash"] = compute_config_hash(all_configs) + for name, bench_config in all_configs.items(): all_configs[name] = resolve_inheritance(bench_config, all_configs) diff --git a/milabench/multi.py b/milabench/multi.py index 9946a3642..c66d1716e 100644 --- a/milabench/multi.py +++ b/milabench/multi.py @@ -13,6 +13,7 @@ is_main_local, is_multinode, is_remote, + milabench_remote_config, milabench_remote_install, milabench_remote_prepare, milabench_remote_run, @@ -84,7 +85,10 @@ def setup_pack(self) -> Package: "dirs": pack.config["dirs"], "config_base": pack.config["config_base"], "config_file": pack.config["config_file"], + "plan": pack.config["plan"], "system": pack.config["system"], + "hash": pack.config["hash"], + "install_variant": pack.config["install_variant"], } ) @@ -121,6 +125,13 @@ async def do_install(self): remote_task = None if is_remote(setup): + await asyncio.wait( + [ + asyncio.create_task(t.execute()) + for t in milabench_remote_config(setup, self.packs) + ] + ) + # We are outside system, setup the main node first remote_plan = milabench_remote_install(setup, setup_for="main") remote_task = asyncio.create_task(remote_plan.execute()) @@ -142,6 +153,13 @@ async def do_prepare(self): remote_task = None if is_remote(setup): + await asyncio.wait( + [ + asyncio.create_task(t.execute()) + for t in milabench_remote_config(setup, self.packs) + ] + ) + remote_plan = milabench_remote_prepare(setup, run_for="main") remote_task = asyncio.create_task(remote_plan.execute()) await asyncio.wait([remote_task]) @@ -158,6 +176,13 @@ async def do_run(self, repeat=1): setup = self.setup_pack() if is_remote(setup): + await asyncio.wait( + [ + asyncio.create_task(t.execute()) + for t in milabench_remote_config(setup, self.packs) + ] + ) + # if we are not on the main node right now # ssh to the main node and launch milabench remote_plan = milabench_remote_run(setup) @@ -248,3 +273,11 @@ async def do_pin( pip_compile_args=pip_compile_args, constraints=new_constraints, ) + + async def do_purge_cloud(self): + setup = self.setup_pack() + + if is_remote(setup): + await asyncio.wait( + [asyncio.create_task(t.execute()) for t in self.manage_cloud(setup, action="teardown")] + ) diff --git a/milabench/remote.py b/milabench/remote.py index bf5963183..b1759f2fa 100644 --- a/milabench/remote.py +++ b/milabench/remote.py @@ -1,16 +1,22 @@ import os import sys +import yaml + +from milabench.fs import XPath + +from . import ROOT_FOLDER from .commands import ( CmdCommand, Command, ListCommand, + SCPCommand, SequenceCommand, SSHCommand, VoidCommand, ) -INSTALL_FOLDER = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) +INSTALL_FOLDER = str(ROOT_FOLDER) def scp(node, folder, dest=None) -> list: @@ -30,21 +36,41 @@ def scp(node, folder, dest=None) -> list: ] -def rsync(node, folder, dest=None) -> list: +def rsync(node, src=None, remote_src=None, dest=None) -> list: """Copy a folder from local node to remote node""" host = node["ip"] user = node["user"] + key = node.get("key", None) + key = f"-i{key}" if key else "" + + if isinstance(src, str): + src = [src] + + assert not src or not remote_src + assert src or remote_src if dest is None: - dest = os.path.abspath(os.path.join(folder, "..")) + _ = remote_src if remote_src else src[0] + dest = os.path.abspath(os.path.join(_, "..")) + + if remote_src: + remote_src = [f"{user}@{host}:{remote_src}"] + src = [] + else: + dest = f"{user}@{host}:{dest}" + remote_src = [] return [ "rsync", + "--force", "-av", "-e", - "ssh -oCheckHostIP=no -oStrictHostKeyChecking=no", - folder, - f"{user}@{host}:{dest}", + f"ssh {key} -oCheckHostIP=no -oStrictHostKeyChecking=no", + "--include=*/.git/*", + *[f"--exclude=*/{_dir}/*" + for _dir in (".*", "venv", "env", "tmp")], + *src, *remote_src, + dest, ] @@ -84,9 +110,9 @@ def milabench_remote_setup_plan(pack, setup_for="worker") -> SequenceCommand: """ nodes = pack.config["system"]["nodes"] - copy = [] node_packs = [] + copy = [] for node in nodes: node_pack = None @@ -107,6 +133,30 @@ def milabench_remote_setup_plan(pack, setup_for="worker") -> SequenceCommand: ) +def milabench_remote_fetch_reports_plan(pack, run_for="main") -> SequenceCommand: + """Copy milabench reports from remote + + Notes + ----- + Assume that the filesystem of remote node mirror local system. + """ + + nodes = pack.config["system"]["nodes"] + runs = pack.config["dirs"]["runs"] + + copy = [] + for node in nodes: + node_pack = None + + if should_run_for(node, run_for): + node_pack = worker_pack(pack, node) + copy.append(CmdCommand(node_pack, *rsync(node, remote_src=str(runs)))) + + return SequenceCommand( + ListCommand(*copy), + ) + + def worker_pack(pack, worker): if is_remote(pack): return pack.copy({}) @@ -131,7 +181,13 @@ def milabench_remote_command(pack, *command, run_for="worker") -> ListCommand: cmds.append( SSHCommand( - CmdCommand(worker_pack(pack, worker), "milabench", *command), + CmdCommand( + worker_pack(pack, worker), + "cd", f"{INSTALL_FOLDER}", "&&", + f"MILABENCH_CONFIG={pack.config['config_file']}", + f"MILABENCH_BASE={os.environ.get('MILABENCH_BASE', '')}", + "milabench", *command + ), host=host, user=user, key=key, @@ -175,6 +231,45 @@ def _sanity(pack, setup_for): assert is_remote(pack), "Only a remote node can setup the main node" +def milabench_remote_config(pack, packs): + config = {} + config_hash = pack.config["hash"] + config_file = XPath(pack.config["config_file"]) + config_file = config_file.with_name(f"{config_file.name}.{config_hash}") + pack.config["config_file"] = str(config_file) + for p in packs.values(): + config[p.config["name"]] = p.config + p.config["config_file"] = str(config_file) + config_file.write_text(yaml.dump(config)) + + for n in pack.config["system"]["nodes"]: + _cmds = [ + SSHCommand( + CmdCommand( + pack, + "(", "mkdir", "-p", str(ROOT_FOLDER.parent), pack.config["dirs"]["base"], ")", + "||", "(", "sudo", "mkdir", "-p", str(ROOT_FOLDER.parent), pack.config["dirs"]["base"], + "&&", "sudo", "chmod", "-R", "a+rwX", str(ROOT_FOLDER.parent), pack.config["dirs"]["base"], ")", + ), + n["ip"], + ), + SSHCommand( + CmdCommand( + pack, + "mkdir", "-p", str(config_file.parent), + ), + n["ip"], + ), + SCPCommand( + pack, + n["ip"], + str(config_file), + ), + ] + + yield SequenceCommand(*_cmds) + + def milabench_remote_install(pack, setup_for="worker") -> SequenceCommand: """Copy milabench code, install milabench, execute milabench install""" _sanity(pack, setup_for) @@ -183,9 +278,9 @@ def milabench_remote_install(pack, setup_for="worker") -> SequenceCommand: return VoidCommand(pack) argv = sys.argv[2:] - return SequenceCommand( milabench_remote_setup_plan(pack, setup_for), + milabench_remote_command(pack, "pin", *argv, run_for=setup_for), milabench_remote_command(pack, "install", *argv, run_for=setup_for), ) @@ -210,4 +305,7 @@ def milabench_remote_run(pack) -> Command: return VoidCommand(pack) argv = sys.argv[2:] - return milabench_remote_command(pack, "run", *argv) + return SequenceCommand( + milabench_remote_command(pack, "run", *argv, run_for="main"), + milabench_remote_fetch_reports_plan(pack, run_for="main"), + )