From 7209c33fb568fff23f777fe6296198e2d7b0ec48 Mon Sep 17 00:00:00 2001
From: Setepenre <pierre.delaunay.tr@gmail.com>
Date: Thu, 6 Jul 2023 16:40:51 -0400
Subject: [PATCH] Docker and executors (#160)

---
 benchmarks/accelerate_opt/benchfile.py | 58 +++++++------------
 docker/Dockerfile-cuda                 |  6 +-
 docs/docker.rst                        | 24 ++++----
 milabench/cli.py                       |  4 +-
 milabench/executors.py                 | 77 ++++++++++++++++++++++----
 milabench/multi.py                     |  4 +-
 milabench/remote.py                    |  2 +
 milabench/utils.py                     |  6 +-
 8 files changed, 110 insertions(+), 71 deletions(-)

diff --git a/benchmarks/accelerate_opt/benchfile.py b/benchmarks/accelerate_opt/benchfile.py
index 675930df6..c886e8f5f 100644
--- a/benchmarks/accelerate_opt/benchfile.py
+++ b/benchmarks/accelerate_opt/benchfile.py
@@ -3,10 +3,7 @@
     CmdExecutor,
     DockerRunExecutor,
     ListExecutor,
-    SCPExecutor,
     SSHExecutor,
-    SequenceExecutor,
-    VoidExecutor,
 )
 from milabench.pack import Package
 from milabench.utils import select_nodes
@@ -20,47 +17,29 @@ def make_env(self):
         env["OMP_NUM_THREADS"] = str(self.config["argv"]["--cpus_per_gpu"])
         return env
 
-    def build_docker_prepare_remote_plan(self):
-        executors = []
-        docker_pull_exec = CmdExecutor(
-            self, "docker", "pull", self.config["system"].get("docker_image", None)
-        )
-        for node in self.config["system"]["nodes"]:
-            if node["main"]:
-                continue
-            executors.append(SSHExecutor(docker_pull_exec, node["ip"]))
-        return ListExecutor(*executors)
-
     def build_prepare_plan(self):
-        prepare = [
-            CmdExecutor(
-                self,
-                "accelerate",
-                "launch",
-                "--mixed_precision=fp16",
-                "--num_machines=1",
-                "--dynamo_backend=no",
-                "--num_processes=1",
-                "--num_cpu_threads_per_process=8",
-                str(self.dirs.code / "main.py"),
-                *self.argv,
-                "--prepare_only",
-                "--cache",
-                str(self.dirs.cache)
-            )
-        ]
-        
-        docker_image = self.config["system"].get("docker_image", None)
-        if docker_image:
-            prepare.append(self.build_docker_prepare_remote_plan())
-
-        return SequenceExecutor(*prepare)
+        return CmdExecutor(
+            self,
+            "accelerate",
+            "launch",
+            "--mixed_precision=fp16",
+            "--num_machines=1",
+            "--dynamo_backend=no",
+            "--num_processes=1",
+            "--num_cpu_threads_per_process=8",
+            str(self.dirs.code / "main.py"),
+            *self.argv,
+            "--prepare_only",
+            "--cache",
+            str(self.dirs.cache)
+        )
 
     def build_run_plan(self):
         plans = []
-        
+
         max_num = self.config["num_machines"]
         nodes = select_nodes(self.config["system"]["nodes"], max_num)
+        key = self.config["system"].get("sshkey")
 
         for rank, node in enumerate(nodes):
             host = node["ip"]
@@ -72,7 +51,7 @@ def build_run_plan(self):
                     setsid=True,
                     use_stdout=True,
                 )
-                
+
             tags = [*self.config["tag"], node["name"]]
             if rank != 0:
                 # Workers do not send training data
@@ -83,6 +62,7 @@ def build_run_plan(self):
             worker = SSHExecutor(
                 host=host,
                 user=user,
+                key=key,
                 executor=DockerRunExecutor(
                     AccelerateLaunchExecutor(pack, rank=rank),
                     self.config["system"].get("docker_image"),
diff --git a/docker/Dockerfile-cuda b/docker/Dockerfile-cuda
index eaad2dae5..ebbc4a460 100644
--- a/docker/Dockerfile-cuda
+++ b/docker/Dockerfile-cuda
@@ -27,7 +27,6 @@ ENV MILABENCH_ARGS=""
 WORKDIR /milabench
 COPY . /milabench/milabench/
 
-
 # Install Dependencies
 # --------------------
 
@@ -40,6 +39,7 @@ COPY . /milabench/milabench/
 # Use ofed_info -s to get your local version
 ARG MOFED_VERSION=5.4-3.4.0.0
 
+ENV DEBIAN_FRONTEND=noninteractive
 RUN apt-get update -y &&\
     apt-get install -y git build-essential curl python3 python-is-python3 python3-pip &&\
     curl -o /etc/apt/trusted.gpg.d/mellanox.asc https://content.mellanox.com/ofed/RPM-GPG-KEY-Mellanox &&\
@@ -47,7 +47,7 @@ RUN apt-get update -y &&\
     curl -o cuda-keyring_1.1-1_all.deb https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64/cuda-keyring_1.1-1_all.deb &&\
     dpkg -i cuda-keyring_1.1-1_all.deb &&\
     apt-get update -y &&\
-    apt-get install -y libibverbs1 cuda-compat-11-8 &&\
+    apt-get install -y libibverbs1 nvidia-compute-utils-535 nvidia-utils-535 cuda-11-8 &&\
     apt-get clean &&\
     rm -rf /var/lib/apt/lists/* &&\
     rm cuda-keyring_1.1-1_all.deb
@@ -56,7 +56,7 @@ RUN apt-get update -y &&\
 
 RUN curl https://sh.rustup.rs -sSf | sh -s -- -y
 ENV PATH="/root/.cargo/bin:${PATH}"
-
+ENV CUDA_HOME=/usr/local/cuda-11.8
 
 # Install Milabench
 # -----------------
diff --git a/docs/docker.rst b/docs/docker.rst
index 09aa3e738..582ca95a6 100644
--- a/docs/docker.rst
+++ b/docs/docker.rst
@@ -97,13 +97,14 @@ There are currently two multi-node benchmarks, ``opt-1_3b-multinode`` (data-para
 ``opt-6_7b-multinode`` (model-parallel, that model is too large to fit on a single GPU). Here is how to run them:
 
 0. Make sure the machine can ssh between each other without passwords
-  - ``ssh-keygen``
 1. Pull the milabench docker image you would like to run on all machines
   - ``docker pull``
-2. Create a list of nodes that will participate in the benchmark inside a ``system.yaml`` file (see example below)
-  - ``vi system.yaml``
+1. Create the output directory
+  - ``mkdir -p results``
+2. Create a list of nodes that will participate in the benchmark inside a ``results/system.yaml`` file (see example below)
+  - ``vi results/system.yaml``
 3. Call milabench with by specifying the node list we created.
-  - ``docker ...-v <privatekey>:/milabench/id_milabench milabench run ... --system system.yaml``
+  - ``docker ... -v $(pwd)/results:/milabench/envs/runs -v <privatekey>:/milabench/id_milabench milabench run ... --system /milabench/envs/runs/system.yaml``
 
 .. notes::
 
@@ -112,7 +113,9 @@ There are currently two multi-node benchmarks, ``opt-1_3b-multinode`` (data-para
 .. code-block:: yaml
 
    system:
-     docker-image: ghcr.io/mila-iqia/milabench:${system.arch}-nightly
+     sshkey: <privatekey>
+     arch: cuda
+     docker_image: ghcr.io/mila-iqia/milabench:${system.arch}-nightly
 
      nodes:
        - name: node1
@@ -135,12 +138,12 @@ Then, the command should look like this:
 
     # Change if needed
     export SSH_KEY_FILE=$HOME/.ssh/id_rsa
-
+    export MILABENCH_IMAGE=ghcr.io/mila-iqia/milabench:cuda-nightly
     docker run -it --rm --gpus all --network host --ipc=host --privileged \
       -v $SSH_KEY_FILE:/milabench/id_milabench \
       -v $(pwd)/results:/milabench/envs/runs \
       $MILABENCH_IMAGE \
-      milabench run --system system.yaml \
+      milabench run --system /milabench/envs/runs/system.yaml \
       --select multinode
 
 The last line (``--select multinode``) specifically selects the multi-node benchmarks. Omit that line to run all benchmarks.
@@ -161,14 +164,15 @@ For example, for 4 nodes:
 .. code-block:: yaml
 
    system:
-     docker-image: ghcr.io/mila-iqia/milabench:${system.arch}-nightly
+     arch: cuda
+     docker_image: ghcr.io/mila-iqia/milabench:${system.arch}-nightly
 
      nodes:
        - name: node1
          ip: 192.168.0.25
          main: true
          port: 8123
-         user: delaunap
+         user: <username>
       
        - name: node2
          ip: 192.168.0.26
@@ -190,7 +194,7 @@ The command would look like
 
 .. code-block:: bash
 
-   docker ... milabench run ... --system system.yaml --overrides overrides.yaml
+   docker ... milabench run ... --system /milabench/envs/runs/system.yaml --overrides /milabench/envs/runs/overrides.yaml
 
 
 .. note::
diff --git a/milabench/cli.py b/milabench/cli.py
index 4859db33c..d163114ff 100644
--- a/milabench/cli.py
+++ b/milabench/cli.py
@@ -354,7 +354,7 @@ def run():
 
         # Which type of dashboard to show (short, long, or no)
         dash: Option & str = os.environ.get("MILABENCH_DASH", "long")
-        
+
         noterm: Option & bool = os.getenv("MILABENCH_NOTERM", "0") == "1"
 
         validations: Option & str = None
@@ -376,7 +376,7 @@ def run():
         success = run_with_loggers(
             mp.do_run(repeat=repeat),
             loggers=[
-                # Terminal Formatter slows down the dashboard, 
+                # Terminal Formatter slows down the dashboard,
                 # if lots of info needs to be printed
                 # in particular rwkv
                 TerminalFormatter() if not noterm else None,
diff --git a/milabench/executors.py b/milabench/executors.py
index 241ce0010..e7334de04 100644
--- a/milabench/executors.py
+++ b/milabench/executors.py
@@ -55,6 +55,8 @@ class Executor:
     def __init__(self, pack_or_exec: Executor | pack.BasePackage, **kwargs) -> None:
         self._pack = None
         self.exec = None
+        # used to know if the command is executed through SSH or locally
+        self.remote = False
 
         if isinstance(pack_or_exec, Executor):
             self.exec = pack_or_exec
@@ -83,6 +85,12 @@ def _set_pack(self, pack):
 
         return False
 
+    def packs(self):
+        if self.pack:
+            yield self.pack
+        else:
+            yield from self.exec.packs()
+
     def copy(self, pack):
         """Copy the execution plan but use a different pack"""
         copy = deepcopy(self)
@@ -108,24 +116,30 @@ def commands(self) -> Generator[Tuple[pack.BasePackage, List, Dict], None, None]
         """
         yield self.pack, [], self.kwargs()
 
-    async def execute(self, timeout=False, timeout_delay=600, **kwargs):
+    async def execute(self, phase="run", timeout=False, timeout_delay=600, **kwargs):
         """Execute all the commands and return the aggregated results"""
         coro = []
 
+        for pack in self.packs():
+            pack.phase = phase
+
         for pack, argv, _kwargs in self.commands():
             await pack.send(event="config", data=pack.config)
             await pack.send(event="meta", data=machine_metadata())
 
-            pack.phase = "run"
             fut = pack.execute(*argv, **{**_kwargs, **kwargs})
-
             coro.append(fut)
 
             if timeout:
                 delay = pack.config.get("max_duration", timeout_delay)
-                asyncio.create_task(force_terminate(pack, delay))
+                timeout_task = asyncio.create_task(force_terminate(pack, delay))
+
+        results = await asyncio.gather(*coro)
+
+        if timeout:
+            timeout_task.cancel()
 
-        return await asyncio.gather(*coro)
+        return results
 
 
 class SingleCmdExecutor(Executor):
@@ -169,6 +183,10 @@ def commands(self) -> Generator[Tuple[pack.BasePackage, List, Dict], None, None]
         for executor in self.executors:
             yield from executor.commands()
 
+    def packs(self):
+        for exec in self.executors:
+            yield from exec.packs()
+
 
 class CmdExecutor(SingleCmdExecutor):
     """Execute a command
@@ -298,8 +316,47 @@ def __init__(
         )
         self.image = image
 
+    def as_container_path(self, path):
+        # replace local output path with docker path
+        base = self.pack.dirs.base
+        path = path.replace(str(base), "/milabench/envs")
+
+        # Replace local installation path with docker path
+        install_path = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
+        path = path.replace(str(install_path), "/milabench/milabench")
+
+        return path
+
+    def argv(self, **kwargs) -> List:
+        """Return the list of command line's arguments for this `Executor`
+        followed by its embedded `Executor`'s list of command line's arguments
+
+        Arguments:
+            **kwargs: some `Executor` might need an argument to dynamically
+                      generate the list of command line's arguments
+        """
+        script_args = self.exec.argv(**kwargs)
+        docker_args = self._argv(**kwargs)
+
+        # we are already in docker the path are correct
+        if len(docker_args) == 0:
+            return script_args
+
+        # we are outisde docker
+        rewritten = []
+        for arg in script_args:
+            # rewrite path to be inside docker
+            rewritten.append(self.as_container_path(arg))
+
+        return docker_args + rewritten
+
+    def is_inside_docker(self):
+        return os.environ.get("MILABENCH_DOCKER", None)
+
     def _argv(self, **kwargs) -> List:
-        if self.image is None or os.environ.get("MILABENCH_DOCKER", None):
+        # if the command is executed remotely it does not matter
+        # if we are inside docker or not
+        if (self.image is None) or (self.is_inside_docker() and not self.remote):
             # No-op when there's no docker image to run or inside a docker
             # container
             return []
@@ -307,14 +364,11 @@ def _argv(self, **kwargs) -> List:
         argv = super()._argv(**kwargs)
 
         env = self.pack.make_env()
-        for var in ("MILABENCH_CONFIG", "XDG_CACHE_HOME", "OMP_NUM_THREADS"):
+        for var in ("XDG_CACHE_HOME", "OMP_NUM_THREADS"):
             argv.append("--env")
-            argv.append(f"{var}='{env[var]}'")
+            argv.append(f"{var}='{self.as_container_path(env[var])}'")
 
         argv.append(self.image)
-        argv.append(f"{self.pack.dirs.code / 'activator'}")
-        argv.append(f"{self.pack.dirs.venv}")
-
         return argv
 
 
@@ -358,6 +412,7 @@ def __init__(
         self.user = user
         self.key = key
         self.port = port
+        executor.remote = not self.is_local()
 
     def _find_node_config(self) -> Dict:
         for n in self.pack.config["system"]["nodes"]:
diff --git a/milabench/multi.py b/milabench/multi.py
index 9c67a0ba7..5ff20036d 100644
--- a/milabench/multi.py
+++ b/milabench/multi.py
@@ -21,8 +21,6 @@
 
 here = XPath(__file__).parent
 
-gpus = get_gpu_info()["gpus"].values()
-
 planning_methods = {}
 
 
@@ -176,7 +174,7 @@ async def do_run(self, repeat=1):
                         continue
 
                     exec_plan = make_execution_plan(pack, index, repeat)
-                    await exec_plan.execute(timeout=True, timeout_delay=600)
+                    await exec_plan.execute("run", timeout=True, timeout_delay=600)
 
                 except Exception as exc:
                     import traceback
diff --git a/milabench/remote.py b/milabench/remote.py
index e26eba2bc..e2d03fe43 100644
--- a/milabench/remote.py
+++ b/milabench/remote.py
@@ -122,6 +122,7 @@ def worker_pack(pack, worker):
 
 def milabench_remote_command(pack, *command, run_for="worker") -> ListExecutor:
     nodes = pack.config["system"]["nodes"]
+    key = pack.config["system"].get("sshkey")
     cmds = []
 
     for worker in nodes:
@@ -134,6 +135,7 @@ def milabench_remote_command(pack, *command, run_for="worker") -> ListExecutor:
                     CmdExecutor(worker_pack(pack, worker), f"milabench", *command),
                     host=host,
                     user=user,
+                    key=key,
                 )
             )
 
diff --git a/milabench/utils.py b/milabench/utils.py
index 00f44c623..d7914eb5f 100644
--- a/milabench/utils.py
+++ b/milabench/utils.py
@@ -207,14 +207,14 @@ def multilogger(*logs, **kwargs):
 def select_nodes(nodes, n):
     """Select n nodes, main node is always first"""
     ranked = []
-    
+
     for node in nodes:
         if node["main"]:
             ranked.insert(0, node)
         else:
             ranked.append(node)
-    
-    return ranked[:max(1, min(n, len(ranked)))]
+
+    return ranked[: max(1, min(n, len(ranked)))]
 
 
 def enumerate_rank(nodes):