diff --git a/beeflow/client/bee_client.py b/beeflow/client/bee_client.py index 50ca17a9..2a6a798a 100644 --- a/beeflow/client/bee_client.py +++ b/beeflow/client/bee_client.py @@ -11,6 +11,7 @@ import pathlib import shutil import subprocess +import getpass import tarfile import tempfile import textwrap @@ -79,15 +80,78 @@ def get_hostname(): return curr_hn +def set_backend_status(new_status): + """Set backend flag to true in database.""" + db = bdb.connect_db(client_db, db_path()) + db.info.set_backend_status(new_status) + + +def check_backend_status(): + """Check if backend flag has been set.""" + db = bdb.connect_db(client_db, db_path()) + status = db.info.get_backend_status() + return status + + +def reset_client_db(): + """Reset client db when beeflow is stopped.""" + setup_hostname("") + set_backend_status("") + + +def check_backend_jobs(start_hn, command=False): + """Check if there is an instance of beeflow running on a backend node.""" + user_name = getpass.getuser() + cmd = ['squeue', '-u', f'{user_name}', '-o', '%N', '-h'] + resp = subprocess.run(cmd, text=True, check=True, stdout=subprocess.PIPE) + + # iterate through available nodes + data = resp.stdout.splitlines() + cur_alloc = False + if get_hostname() in data: + cur_alloc = True + + if cur_alloc: + if command: + warn(f'beeflow was started on "{get_hostname()}" and you are trying to ' + f'run a command on "{start_hn}".') + sys.exit(1) + else: + warn(f'beeflow was started on compute node "{get_hostname()}" ' + 'and it is still running. ') + sys.exit(1) + else: # beeflow was started on compute node but user no longer owns node + if command: + warn('beeflow has not been started!') + sys.exit(1) + else: + warn('beeflow was started on a compute node (no longer owned by user) and ' + 'not stopped correctly. ') + warn("Resetting client database.") + reset_client_db() + setup_hostname(start_hn) # add to client db + + +def check_db_flags(start_hn): + """Check that beeflow was stopped correctly during the last run.""" + if get_hostname() and get_hostname() != start_hn and check_backend_status() == "": + warn(f'Error: beeflow is already running on "{get_hostname()}".') + sys.exit(1) + if get_hostname() and get_hostname() != start_hn and check_backend_status() == "true": + check_backend_jobs(start_hn) + + def check_hostname(curr_hn): """Check current front end name matches the one beeflow was started on.""" - db = bdb.connect_db(client_db, db_path()) - start_hn = db.info.get_hostname() - if start_hn and curr_hn != start_hn: # noqa: don't use set instead - warn(f'beeflow was started on "{start_hn}" and you are trying to ' + if get_hostname() and curr_hn != get_hostname() and check_backend_status() == "": + warn(f'beeflow was started on "{get_hostname()}" and you are trying to ' f'run a command on "{curr_hn}".') - if start_hn == "": + sys.exit(1) + elif get_hostname() and curr_hn != get_hostname() and check_backend_status() == "true": + check_backend_jobs(curr_hn, command=True) + if get_hostname() == "" and check_backend_status() == "": warn('beeflow has not been started!') + sys.exit(1) def error_exit(msg, include_caller=True): @@ -704,5 +768,4 @@ def main(): # Ignore W0511: This allows us to have TODOs in the code # Ignore R1732: Significant code restructuring required to fix -# Ignore R1714: Not using a set instead # pylama:ignore=W0511,R1732 diff --git a/beeflow/client/core.py b/beeflow/client/core.py index 229b0c34..483c8054 100644 --- a/beeflow/client/core.py +++ b/beeflow/client/core.py @@ -305,11 +305,22 @@ def load_check_charliecloud(): def check_dependencies(backend=False): """Check for various dependencies in the environment.""" - # Check if running on compute node under Slurm scheduler + # Note: Code block of lines 309-320 is for SLURM scheduler! + # Check if running on compute node under Slurm scheduler and --backend not specified if not backend and os.environ.get('SLURM_JOB_NODELIST') is not None: - warn('Slurm job node detected! Beeflow should not be run on a compute node.') + warn('Slurm job node detected! Beeflow should not be run on a compute node. ') warn(f'SLURM_JOB_NODELIST = {os.environ.get("SLURM_JOB_NODELIST")}') + bee_client.reset_client_db() sys.exit(1) + # Check if running on front end with --backend flag specified + if backend and os.environ.get('SLURM_JOB_NODELIST') is None: + warn('Slurm node was not detected! Are you sure you are on a compute node? ') + warn("Please run 'beeflow core start' again without the --backend flag.") + bee_client.reset_client_db() + sys.exit(1) + + # TO DO: Add checks when using Flux scheduler + print('Checking dependencies...') # Check for Charliecloud and its version load_check_charliecloud() @@ -396,13 +407,15 @@ def start(foreground: bool = typer.Option(False, '--foreground', '-F', '-B', help='allow to run on a backend node')): """Start all BEE components.""" start_hn = socket.gethostname() # hostname when beeflow starts - if bee_client.get_hostname() == "": + if bee_client.get_hostname() == "" and bee_client.check_backend_status() == "": bee_client.setup_hostname(start_hn) # add to client db - elif bee_client.get_hostname() != start_hn: - warn(f'Error: beeflow is already running on "{bee_client.get_hostname()}."') - sys.exit(1) + else: + time.sleep(10) # giving Slurm time to relinquish compute node, if necessary + bee_client.check_db_flags(start_hn) + if backend: # allow beeflow to run on backend node check_dependencies(backend=True) + bee_client.set_backend_status("true") # add flag to db else: check_dependencies() mgr = init_components() @@ -450,9 +463,10 @@ def start(foreground: bool = typer.Option(False, '--foreground', '-F', def status(): """Check the status of beeflow and the components.""" status_hn = socket.gethostname() # hostname when beeflow core status returned + bee_client.check_hostname(status_hn) resp = cli_connection.send(paths.beeflow_socket(), {'type': 'status'}) if resp is None: - bee_client.check_hostname(status_hn) + # bee_client.check_hostname(status_hn) # need to remove since checked in line 465? sys.exit(1) print('beeflow components:') for comp, stat in resp['components'].items(): @@ -472,6 +486,7 @@ def info(): def stop(query='yes'): """Stop the current running beeflow daemon.""" stop_hn = socket.gethostname() # hostname when beeflow core stop returned + bee_client.check_hostname(stop_hn) # Check workflow states; warn if there are active states, pause running workflows workflow_list = bee_client.get_wf_list() concern_states = {'Running', 'Initializing', 'Waiting'} @@ -494,13 +509,13 @@ def stop(query='yes'): bee_client.pause(wf_id) resp = cli_connection.send(paths.beeflow_socket(), {'type': 'quit'}) if resp is None: - bee_client.check_hostname(stop_hn) + # bee_client.check_hostname(stop_hn) # need to remove since checked in line 487? sys.exit(1) # As long as it returned something, we should be good beeflow_log = paths.log_fname('beeflow') if query == "yes": print(f'Beeflow has stopped. Check the log at "{beeflow_log}".') - bee_client.setup_hostname("") + bee_client.reset_client_db() def archive_dir(dir_to_archive): diff --git a/beeflow/common/db/client_db.py b/beeflow/common/db/client_db.py index 2bb837ba..2443e6ab 100644 --- a/beeflow/common/db/client_db.py +++ b/beeflow/common/db/client_db.py @@ -21,10 +21,20 @@ def set_hostname(self, new_hostname): def get_hostname(self): """Return hostname for current front end.""" stmt = "SELECT hostname FROM info" - result = bdb.getone(self.db_file, stmt)[0] - hostname = result + hostname = bdb.getone(self.db_file, stmt)[0] return hostname + def set_backend_status(self, status): + """Set backend flag status: true (running on backend).""" + stmt = "UPDATE info set backend=?" + bdb.run(self.db_file, stmt, [status]) + + def get_backend_status(self): + """Return if backend flag is set to true or empty.""" + stmt = "SELECT backend FROM info" + status = bdb.getone(self.db_file, stmt)[0] + return status + class ClientDB: """Client database.""" @@ -38,13 +48,13 @@ def _init_tables(self): """Initialize the client table if it doesn't exist.""" info_stmt = """CREATE TABLE IF NOT EXISTS info ( id INTEGER PRIMARY KEY ASC, - hostname TEXT);""" + hostname TEXT, + backend TEXT);""" if not bdb.table_exists(self.db_file, 'info'): bdb.create_table(self.db_file, info_stmt) - # insert a new workflow into the database - stmt = """INSERT INTO info (hostname) VALUES(?);""" - tmp = "" - bdb.run(self.db_file, stmt, [tmp]) + # initialize hostname and backend values + stmt = """INSERT INTO info (hostname, backend) VALUES(?,?);""" + bdb.run(self.db_file, stmt, ["", ""]) @property def info(self): diff --git a/beeflow/tests/test_db_client.py b/beeflow/tests/test_db_client.py index 50220446..e6e6e251 100644 --- a/beeflow/tests/test_db_client.py +++ b/beeflow/tests/test_db_client.py @@ -21,7 +21,9 @@ def test_empty(temp_db): db = temp_db host_name = db.info.get_hostname() + backend_stat = db.info.get_backend_status() assert host_name == "" + assert backend_stat == "" def test_info(temp_db): @@ -31,7 +33,11 @@ def test_info(temp_db): db.info.set_hostname('front_end_name') host_name = db.info.get_hostname() + db.info.set_backend_status('true') + backend_stat = db.info.get_backend_status() + assert host_name == 'front_end_name' + assert backend_stat == 'true' # Ignore W0621: PyLama complains about redefining 'temp_db' from the outer # scope. This is how pytest fixtures work. # pylama:ignore=W0621