Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue883/add check for different front end #933

Merged
merged 19 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 44 additions & 2 deletions beeflow/client/bee_client.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/usr/bin/env python3
"""bee-client.

This script provides an client interface to the user to manage workflows.
This script provides a client interface to the user to manage workflows.
Capablities include submitting, starting, listing, pausing and cancelling workflows.
"""
import os
Expand Down Expand Up @@ -29,6 +29,8 @@
from beeflow.common.wf_data import generate_workflow_id
from beeflow.client import core
from beeflow.wf_manager.resources import wf_utils
from beeflow.common.db import client_db
from beeflow.common.db import bdb

# Length of a shortened workflow ID
short_id_len = 6 #noqa: Not a constant
Expand All @@ -53,6 +55,41 @@ def __init__(self, *args):
self.args = args


def warn(*pargs):
"""Print a red warning message."""
typer.secho(' '.join(pargs), fg=typer.colors.RED, file=sys.stderr)


def db_path():
"""Return the client database path."""
bee_workdir = config_driver.BeeConfig.get('DEFAULT', 'bee_workdir')
return os.path.join(bee_workdir, 'client.db')


def setup_hostname(start_hn):
"""Set up front end name when beeflow core start is returned."""
db = bdb.connect_db(client_db, db_path())
db.info.set_hostname(start_hn)


def get_hostname():
"""Check if beeflow is running somewhere else."""
db = bdb.connect_db(client_db, db_path())
curr_hn = db.info.get_hostname()
return curr_hn


def check_hostname(curr_hn):
"""Check current front end name matches the one beeflow was started on."""
db = bdb.connect_db(client_db, db_path())
start_hn = db.info.get_hostname()
if start_hn and curr_hn != start_hn: # noqa: don't use set instead
warn(f'beeflow was started on "{start_hn}" and you are trying to '
kchilleri marked this conversation as resolved.
Show resolved Hide resolved
f'run a command on "{curr_hn}".')
if start_hn == "":
warn('beeflow has not been started!')


def error_exit(msg, include_caller=True):
"""Print a message and exit or raise an error with that message."""
if include_caller:
Expand Down Expand Up @@ -115,7 +152,11 @@ def get_wf_list():
conn = _wfm_conn()
resp = conn.get(_url(), timeout=60)
except requests.exceptions.ConnectionError:
error_exit('Could not reach WF Manager.')
if get_hostname() == "":
warn('beeflow has not been started!')
sys.exit(1)
else:
error_exit('Could not reach WF Manager.')

if resp.status_code != requests.codes.okay: # pylint: disable=no-member
error_exit('WF Manager did not return workflow list')
Expand Down Expand Up @@ -680,4 +721,5 @@ def main():

# Ignore W0511: This allows us to have TODOs in the code
# Ignore R1732: Significant code restructuring required to fix
# Ignore R1714: Not using a set instead
# pylama:ignore=W0511,R1732
21 changes: 14 additions & 7 deletions beeflow/client/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,12 @@ def start(foreground: bool = typer.Option(False, '--foreground', '-F',
help='run in the foreground'), backend: bool = typer.Option(False, '--backend',
'-B', help='allow to run on a backend node')):
"""Start all BEE components."""
start_hn = socket.gethostname() # hostname when beeflow starts
if bee_client.get_hostname() == "":
bee_client.setup_hostname(start_hn) # add to client db
elif bee_client.get_hostname() != start_hn:
warn(f'Error: beeflow is already running on "{bee_client.get_hostname()}."')
sys.exit(1)
if backend: # allow beeflow to run on backend node
check_dependencies(backend=True)
else:
Expand Down Expand Up @@ -439,6 +445,7 @@ def start(foreground: bool = typer.Option(False, '--foreground', '-F',

version = importlib.metadata.version("hpc-beeflow")
print(f'Starting beeflow {version}...')
print(f'Running beeflow on {start_hn}')
if not foreground:
print('Run `beeflow core status` for more information.')
# Create the log path if it doesn't exist yet
Expand All @@ -457,11 +464,10 @@ def start(foreground: bool = typer.Option(False, '--foreground', '-F',
@app.command()
def status():
"""Check the status of beeflow and the components."""
status_hn = socket.gethostname() # hostname when beeflow core status returned
resp = cli_connection.send(paths.beeflow_socket(), {'type': 'status'})
if resp is None:
beeflow_log = paths.log_fname('beeflow')
warn('Cannot connect to the beeflow daemon, is it running? Check the '
f'log at "{beeflow_log}".')
bee_client.check_hostname(status_hn)
sys.exit(1)
print('beeflow components:')
for comp, stat in resp['components'].items():
Expand All @@ -480,6 +486,7 @@ def info():
@app.command()
def stop(query='yes'):
"""Stop the current running beeflow daemon."""
stop_hn = socket.gethostname() # hostname when beeflow core stop returned
# Check workflow states; warn if there are active states, pause running workflows
workflow_list = bee_client.get_wf_list()
concern_states = {'Running', 'Initializing', 'Waiting'}
Expand All @@ -502,15 +509,13 @@ def stop(query='yes'):
bee_client.pause(wf_id)
resp = cli_connection.send(paths.beeflow_socket(), {'type': 'quit'})
if resp is None:
beeflow_log = paths.log_fname('beeflow')
warn('Error: beeflow is not running on this system. It could be '
'running on a different front end.\n'
f' Check the beeflow log: "{beeflow_log}".')
bee_client.check_hostname(stop_hn)
sys.exit(1)
# As long as it returned something, we should be good
beeflow_log = paths.log_fname('beeflow')
if query == "yes":
print(f'Beeflow has stopped. Check the log at "{beeflow_log}".')
bee_client.setup_hostname("")


def archive_dir(dir_to_archive):
Expand Down Expand Up @@ -547,6 +552,8 @@ def reset(archive: bool = typer.Option(False, '--archive', '-a',
help='Archive bee_workdir before removal')):
"""Stop all components and delete the bee_workdir directory."""
# Check workflow states; warn if there are active states.
reset_hn = socket.gethostname()
bee_client.check_hostname(reset_hn)
workflow_list = bee_client.get_wf_list()
active_states = {'Running', 'Paused', 'Initializing', 'Waiting'}
caution = ""
Expand Down
57 changes: 57 additions & 0 deletions beeflow/common/db/client_db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
"""Client database code."""

from collections import namedtuple

from beeflow.common.db import bdb


class ClientInfo:
"""Client Info object."""

def __init__(self, db_file):
"""Initialize info and db file."""
self.Info = namedtuple("Info", "id hostname") # noqa Snake Case
self.db_file = db_file

def set_hostname(self, new_hostname):
"""Set hostname for current front end."""
stmt = "UPDATE info set hostname=?"
bdb.run(self.db_file, stmt, [new_hostname])

def get_hostname(self):
"""Return hostname for current front end."""
stmt = "SELECT hostname FROM info"
result = bdb.getone(self.db_file, stmt)[0]
hostname = result
return hostname


class ClientDB:
"""Client database."""

def __init__(self, db_file):
"""Construct a new client database connection."""
self.db_file = db_file
self._init_tables()

def _init_tables(self):
"""Initialize the client table if it doesn't exist."""
info_stmt = """CREATE TABLE IF NOT EXISTS info (
id INTEGER PRIMARY KEY ASC,
hostname TEXT);"""
if not bdb.table_exists(self.db_file, 'info'):
bdb.create_table(self.db_file, info_stmt)
# insert a new workflow into the database
stmt = """INSERT INTO info (hostname) VALUES(?);"""
tmp = ""
bdb.run(self.db_file, stmt, [tmp])

@property
def info(self):
"""Get info from the database."""
return ClientInfo(self.db_file)


def open_db(db_file):
"""Open and return a new database."""
return ClientDB(db_file)
37 changes: 37 additions & 0 deletions beeflow/tests/test_db_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
"""Tests of the client database."""
import tempfile
import os

import pytest

from beeflow.common.db import client_db


@pytest.fixture
def temp_db():
"""Create a fixture for making a temporary database."""
fname = tempfile.mktemp()
db = client_db.open_db(fname)
yield db
os.remove(fname)


def test_empty(temp_db):
"""Test the empty database."""
db = temp_db

host_name = db.info.get_hostname()
assert host_name == ""


def test_info(temp_db):
"""Test setting the info."""
db = temp_db

db.info.set_hostname('front_end_name')
host_name = db.info.get_hostname()

assert host_name == 'front_end_name'
# Ignore W0621: PyLama complains about redefining 'temp_db' from the outer
# scope. This is how pytest fixtures work.
# pylama:ignore=W0621