Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix slurmrestd version function #953

Merged
merged 13 commits into from
Dec 3, 2024
24 changes: 2 additions & 22 deletions beeflow/client/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
will be started.
"""
import os
import re
import signal
import subprocess
import socket
Expand All @@ -19,7 +18,6 @@
import time
import importlib.metadata
from pathlib import Path
import packaging

import daemon
import typer
Expand All @@ -31,6 +29,7 @@
from beeflow.common import cli_connection
from beeflow.common import paths
from beeflow.wf_manager.resources import wf_utils
import beeflow.common.worker.utils as worker_utils

from beeflow.common.deps import container_manager
from beeflow.common.deps import neo4j_manager
Expand Down Expand Up @@ -168,22 +167,6 @@ def need_slurmrestd():
and not bc.get('slurm', 'use_commands'))


def get_slurmrestd_version():
"""Get the newest slurmrestd version."""
resp = subprocess.run(["slurmrestd", "-s", "list"], check=True, stderr=subprocess.PIPE,
text=True).stderr
resp = resp.split("\n")
# Confirm slurmrestd format is the same
# If the slurmrestd list outputs has changed potentially something else has broken
if "Possible OpenAPI plugins" not in resp[0]:
print("Slurmrestd OpenAPI format has changed and things may break")
api_versions = [line.split('/')[1] for line in resp[1:] if re.search(r"openapi/v\d+\.\d+\.\d+",
line)]
# Sort the versions and grab the newest one
newest_api = sorted(api_versions, key=packaging.version.Version, reverse=True)[0]
return newest_api


def init_components():
"""Initialize the components and component manager."""
mgr = ComponentManager()
Expand Down Expand Up @@ -265,10 +248,7 @@ def start_slurm_restd():
"""Start BEESlurmRestD. Returns a Popen process object."""
bee_workdir = bc.get('DEFAULT', 'bee_workdir')
slurmrestd_log = '/'.join([bee_workdir, 'logs', 'restd.log'])
openapi_version = bc.get('slurm', 'openapi_version')
if not openapi_version:
# Detect the newest version of the slurmrestd API
openapi_version = get_slurmrestd_version()
openapi_version = worker_utils.get_slurmrestd_version()
slurm_args = f'-s openapi/{openapi_version}'
# The following adds the db plugin we opted not to use for now
# slurm_args = f'-s openapi/{openapi_version},openapi/db{openapi_version}'
Expand Down
3 changes: 1 addition & 2 deletions beeflow/common/config_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,8 +343,7 @@ def validate_chrun_opts(opts):
default=(shutil.which('slurmrestd') is None),
info='if set, use slurm cli commands instead of slurmrestd')
DEFAULT_SLURMRESTD_SOCK = join_path('/tmp', f'slurm_{USER}_{random.randint(1, 10000)}.sock')
VALIDATOR.option('slurm', 'openapi_version', default='v0.0.39',
info='openapi version to use for slurmrestd')

# Scheduler
VALIDATOR.section('scheduler', info='Scheduler configuration section.')
SCHEDULER_ALGORITHMS = ('fcfs', 'backfill', 'sjf')
Expand Down
4 changes: 3 additions & 1 deletion beeflow/common/worker/slurm_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import requests

from beeflow.common import log as bee_logging
import beeflow.common.worker.utils as worker_utils
from beeflow.common.worker.worker import (Worker, WorkerError)
from beeflow.common import validation
from beeflow.common.worker.utils import get_state_sacct
Expand Down Expand Up @@ -154,8 +155,9 @@ def submit_task(self, task):
class SlurmrestdWorker(BaseSlurmWorker):
"""Worker class for when slurmrestd is available."""

def __init__(self, bee_workdir, openapi_version, **kwargs):
def __init__(self, bee_workdir, **kwargs):
"""Create a new Slurmrestd Worker object."""
openapi_version = worker_utils.get_slurmrestd_version()
super().__init__(bee_workdir=bee_workdir, **kwargs)
# Pull slurm socket configs from kwargs (Uses getpass.getuser() instead
# of os.getlogin() because of an issue with using getlogin() without a
Expand Down
20 changes: 20 additions & 0 deletions beeflow/common/worker/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
"""Worker utility functions."""

import re
import subprocess
from packaging.version import Version

from beeflow.common.worker.worker import WorkerError
from beeflow.common import log as bee_logging

Expand Down Expand Up @@ -32,3 +35,20 @@ def parse_key_val(pair):
"""Parse the key-value pair separated by '='."""
i = pair.find('=')
return (pair[:i], pair[i + 1:])


def get_slurmrestd_version():
"""Get the newest slurmrestd version."""
resp = subprocess.run(["slurmrestd", "-s", "list"], check=True, stderr=subprocess.PIPE,
text=True).stderr
resp = resp.split("\n")
# Confirm slurmrestd format is the same
# If the slurmrestd list outputs has changed potentially something else has broken
if "Possible OpenAPI plugins" not in resp[0]:
print("Slurmrestd OpenAPI format has changed and things may break")
api_versions = [line.split('/')[1] for line in resp[1:] if re.search(r"openapi/v\d+\.\d+\.\d+",
line)]
# Sort the versions and grab the newest one
newest_api = sorted(api_versions, key=Version, reverse=True)[0]
print(f"Inferred slurmrestd version: {newest_api}")
return newest_api
3 changes: 2 additions & 1 deletion beeflow/task_manager/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from beeflow.common import paths
from beeflow.common.connection import Connection
from beeflow.common.worker_interface import WorkerInterface
import beeflow.common.worker.utils as worker_utils


def db_path():
Expand Down Expand Up @@ -43,7 +44,7 @@ def worker_interface():
if wls == 'Slurm':
worker_kwargs['use_commands'] = bc.get('slurm', 'use_commands')
worker_kwargs['slurm_socket'] = paths.slurm_socket()
worker_kwargs['openapi_version'] = bc.get('slurm', 'openapi_version')
worker_kwargs['openapi_version'] = worker_utils.get_slurmrestd_version
return WorkerInterface(worker_class, **worker_kwargs)


Expand Down
15 changes: 10 additions & 5 deletions beeflow/tests/test_slurm_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
import subprocess
import os
import pytest
from beeflow.common.config_driver import BeeConfig as bc

import beeflow.common.worker.utils as worker_utils
from beeflow.common.worker_interface import WorkerInterface
from beeflow.common.worker.worker import WorkerError
from beeflow.common.worker.slurm_worker import SlurmWorker
Expand All @@ -15,7 +16,9 @@
# Timeout (seconds) for waiting on tasks
TIMEOUT = 150
# Extra slurmrestd arguments. This may be something to take on the command line
OPENAPI_VERSION = bc.get('slurm', 'openapi_version')
# Open API version just needs to be some arbitrary version
# since this tests doesn't actually run with slurmrestd

GOOD_TASK = Task(name='good-task', base_command=['sleep', '3'], hints=[],
requirements=[], inputs=[], outputs=[], stdout='', stderr='',
workflow_id=uuid.uuid4().hex)
Expand Down Expand Up @@ -44,12 +47,13 @@ def slurm_worker(request):
slurm_socket = f'/tmp/{uuid.uuid4().hex}.sock'
bee_workdir = os.path.expanduser(f'/tmp/{uuid.uuid4().hex}.tmp')
os.mkdir(bee_workdir)
proc = subprocess.Popen(f'slurmrestd -s openapi/{OPENAPI_VERSION} unix:{slurm_socket}',
openapi_version = worker_utils.get_slurmrestd_version()
proc = subprocess.Popen(f'slurmrestd -s openapi/{openapi_version} unix:{slurm_socket}',
shell=True)
time.sleep(1)
worker_iface = WorkerInterface(worker=SlurmWorker, container_runtime='Charliecloud',
slurm_socket=slurm_socket, bee_workdir=bee_workdir,
openapi_version=OPENAPI_VERSION,
openapi_version=openapi_version,
use_commands=request.param)
yield worker_iface
time.sleep(1)
Expand All @@ -63,9 +67,10 @@ def slurmrestd_worker_no_daemon():
slurm_socket = f'/tmp/{uuid.uuid4().hex}.sock'
bee_workdir = os.path.expanduser(f'/tmp/{uuid.uuid4().hex}.tmp')
os.mkdir(bee_workdir)
openapi_version = worker_utils.get_slurmrestd_version()
yield WorkerInterface(worker=SlurmWorker, container_runtime='Charliecloud',
slurm_socket=slurm_socket, bee_workdir=bee_workdir,
openapi_version=OPENAPI_VERSION,
openapi_version=openapi_version,
use_commands=False)
shutil.rmtree(bee_workdir)

Expand Down
2 changes: 0 additions & 2 deletions ci/bee_config.sh
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,12 @@ Slurmrestd)
cat >> $BEE_CONFIG <<EOF
[slurm]
use_commands = False
openapi_version = $OPENAPI_VERSION
EOF
;;
SlurmCommands)
cat >> $BEE_CONFIG <<EOF
[slurm]
use_commands = True
openapi_version = $OPENAPI_VERSION
EOF
;;
esac
Expand Down
Loading