Skip to content

Commit

Permalink
Merge branch 'develop' into Issue984/switch-to-ruff
Browse files Browse the repository at this point in the history
  • Loading branch information
leahh committed Jan 14, 2025
2 parents 9059ced + 8f10fe0 commit ba7df46
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 22 deletions.
75 changes: 69 additions & 6 deletions beeflow/client/bee_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import pathlib
import shutil
import subprocess
import getpass
import tarfile
import tempfile
import textwrap
Expand Down Expand Up @@ -79,15 +80,78 @@ def get_hostname():
return curr_hn


def set_backend_status(new_status):
"""Set backend flag to true in database."""
db = bdb.connect_db(client_db, db_path())
db.info.set_backend_status(new_status)


def check_backend_status():
"""Check if backend flag has been set."""
db = bdb.connect_db(client_db, db_path())
status = db.info.get_backend_status()
return status


def reset_client_db():
"""Reset client db when beeflow is stopped."""
setup_hostname("")
set_backend_status("")


def check_backend_jobs(start_hn, command=False):
"""Check if there is an instance of beeflow running on a backend node."""
user_name = getpass.getuser()
cmd = ['squeue', '-u', f'{user_name}', '-o', '%N', '-h']
resp = subprocess.run(cmd, text=True, check=True, stdout=subprocess.PIPE)

# iterate through available nodes
data = resp.stdout.splitlines()
cur_alloc = False
if get_hostname() in data:
cur_alloc = True

if cur_alloc:
if command:
warn(f'beeflow was started on "{get_hostname()}" and you are trying to '
f'run a command on "{start_hn}".')
sys.exit(1)
else:
warn(f'beeflow was started on compute node "{get_hostname()}" '
'and it is still running. ')
sys.exit(1)
else: # beeflow was started on compute node but user no longer owns node
if command:
warn('beeflow has not been started!')
sys.exit(1)
else:
warn('beeflow was started on a compute node (no longer owned by user) and '
'not stopped correctly. ')
warn("Resetting client database.")
reset_client_db()
setup_hostname(start_hn) # add to client db


def check_db_flags(start_hn):
"""Check that beeflow was stopped correctly during the last run."""
if get_hostname() and get_hostname() != start_hn and check_backend_status() == "":
warn(f'Error: beeflow is already running on "{get_hostname()}".')
sys.exit(1)
if get_hostname() and get_hostname() != start_hn and check_backend_status() == "true":
check_backend_jobs(start_hn)


def check_hostname(curr_hn):
"""Check current front end name matches the one beeflow was started on."""
db = bdb.connect_db(client_db, db_path())
start_hn = db.info.get_hostname()
if start_hn and curr_hn != start_hn: # noqa: don't use set instead
warn(f'beeflow was started on "{start_hn}" and you are trying to '
if get_hostname() and curr_hn != get_hostname() and check_backend_status() == "":
warn(f'beeflow was started on "{get_hostname()}" and you are trying to '
f'run a command on "{curr_hn}".')
if start_hn == "":
sys.exit(1)
elif get_hostname() and curr_hn != get_hostname() and check_backend_status() == "true":
check_backend_jobs(curr_hn, command=True)
if get_hostname() == "" and check_backend_status() == "":
warn('beeflow has not been started!')
sys.exit(1)


def error_exit(msg, include_caller=True):
Expand Down Expand Up @@ -704,5 +768,4 @@ def main():

# Ignore W0511: This allows us to have TODOs in the code
# Ignore R1732: Significant code restructuring required to fix
# Ignore R1714: Not using a set instead
# pylama:ignore=W0511,R1732
33 changes: 24 additions & 9 deletions beeflow/client/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,11 +305,22 @@ def load_check_charliecloud():

def check_dependencies(backend=False):
"""Check for various dependencies in the environment."""
# Check if running on compute node under Slurm scheduler
# Note: Code block of lines 309-320 is for SLURM scheduler!
# Check if running on compute node under Slurm scheduler and --backend not specified
if not backend and os.environ.get('SLURM_JOB_NODELIST') is not None:
warn('Slurm job node detected! Beeflow should not be run on a compute node.')
warn('Slurm job node detected! Beeflow should not be run on a compute node. ')
warn(f'SLURM_JOB_NODELIST = {os.environ.get("SLURM_JOB_NODELIST")}')
bee_client.reset_client_db()
sys.exit(1)
# Check if running on front end with --backend flag specified
if backend and os.environ.get('SLURM_JOB_NODELIST') is None:
warn('Slurm node was not detected! Are you sure you are on a compute node? ')
warn("Please run 'beeflow core start' again without the --backend flag.")
bee_client.reset_client_db()
sys.exit(1)

# TO DO: Add checks when using Flux scheduler

print('Checking dependencies...')
# Check for Charliecloud and its version
load_check_charliecloud()
Expand Down Expand Up @@ -396,13 +407,15 @@ def start(foreground: bool = typer.Option(False, '--foreground', '-F',
'-B', help='allow to run on a backend node')):
"""Start all BEE components."""
start_hn = socket.gethostname() # hostname when beeflow starts
if bee_client.get_hostname() == "":
if bee_client.get_hostname() == "" and bee_client.check_backend_status() == "":
bee_client.setup_hostname(start_hn) # add to client db
elif bee_client.get_hostname() != start_hn:
warn(f'Error: beeflow is already running on "{bee_client.get_hostname()}."')
sys.exit(1)
else:
time.sleep(10) # giving Slurm time to relinquish compute node, if necessary
bee_client.check_db_flags(start_hn)

if backend: # allow beeflow to run on backend node
check_dependencies(backend=True)
bee_client.set_backend_status("true") # add flag to db
else:
check_dependencies()
mgr = init_components()
Expand Down Expand Up @@ -450,9 +463,10 @@ def start(foreground: bool = typer.Option(False, '--foreground', '-F',
def status():
"""Check the status of beeflow and the components."""
status_hn = socket.gethostname() # hostname when beeflow core status returned
bee_client.check_hostname(status_hn)
resp = cli_connection.send(paths.beeflow_socket(), {'type': 'status'})
if resp is None:
bee_client.check_hostname(status_hn)
# bee_client.check_hostname(status_hn) # need to remove since checked in line 465?
sys.exit(1)
print('beeflow components:')
for comp, stat in resp['components'].items():
Expand All @@ -472,6 +486,7 @@ def info():
def stop(query='yes'):
"""Stop the current running beeflow daemon."""
stop_hn = socket.gethostname() # hostname when beeflow core stop returned
bee_client.check_hostname(stop_hn)
# Check workflow states; warn if there are active states, pause running workflows
workflow_list = bee_client.get_wf_list()
concern_states = {'Running', 'Initializing', 'Waiting'}
Expand All @@ -494,13 +509,13 @@ def stop(query='yes'):
bee_client.pause(wf_id)
resp = cli_connection.send(paths.beeflow_socket(), {'type': 'quit'})
if resp is None:
bee_client.check_hostname(stop_hn)
# bee_client.check_hostname(stop_hn) # need to remove since checked in line 487?
sys.exit(1)
# As long as it returned something, we should be good
beeflow_log = paths.log_fname('beeflow')
if query == "yes":
print(f'Beeflow has stopped. Check the log at "{beeflow_log}".')
bee_client.setup_hostname("")
bee_client.reset_client_db()


def archive_dir(dir_to_archive):
Expand Down
24 changes: 17 additions & 7 deletions beeflow/common/db/client_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,20 @@ def set_hostname(self, new_hostname):
def get_hostname(self):
"""Return hostname for current front end."""
stmt = "SELECT hostname FROM info"
result = bdb.getone(self.db_file, stmt)[0]
hostname = result
hostname = bdb.getone(self.db_file, stmt)[0]
return hostname

def set_backend_status(self, status):
"""Set backend flag status: true (running on backend)."""
stmt = "UPDATE info set backend=?"
bdb.run(self.db_file, stmt, [status])

def get_backend_status(self):
"""Return if backend flag is set to true or empty."""
stmt = "SELECT backend FROM info"
status = bdb.getone(self.db_file, stmt)[0]
return status


class ClientDB:
"""Client database."""
Expand All @@ -38,13 +48,13 @@ def _init_tables(self):
"""Initialize the client table if it doesn't exist."""
info_stmt = """CREATE TABLE IF NOT EXISTS info (
id INTEGER PRIMARY KEY ASC,
hostname TEXT);"""
hostname TEXT,
backend TEXT);"""
if not bdb.table_exists(self.db_file, 'info'):
bdb.create_table(self.db_file, info_stmt)
# insert a new workflow into the database
stmt = """INSERT INTO info (hostname) VALUES(?);"""
tmp = ""
bdb.run(self.db_file, stmt, [tmp])
# initialize hostname and backend values
stmt = """INSERT INTO info (hostname, backend) VALUES(?,?);"""
bdb.run(self.db_file, stmt, ["", ""])

@property
def info(self):
Expand Down
6 changes: 6 additions & 0 deletions beeflow/tests/test_db_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ def test_empty(temp_db):
db = temp_db

host_name = db.info.get_hostname()
backend_stat = db.info.get_backend_status()
assert host_name == ""
assert backend_stat == ""


def test_info(temp_db):
Expand All @@ -31,7 +33,11 @@ def test_info(temp_db):
db.info.set_hostname('front_end_name')
host_name = db.info.get_hostname()

db.info.set_backend_status('true')
backend_stat = db.info.get_backend_status()

assert host_name == 'front_end_name'
assert backend_stat == 'true'
# Ignore W0621: PyLama complains about redefining 'temp_db' from the outer
# scope. This is how pytest fixtures work.
# pylama:ignore=W0621

0 comments on commit ba7df46

Please sign in to comment.