Skip to content

Commit

Permalink
adding the run id
Browse files Browse the repository at this point in the history
  • Loading branch information
PhillipsOwen committed Feb 26, 2024
1 parent 0cf44f2 commit a9bd38b
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 30 deletions.
5 changes: 3 additions & 2 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
parser = ArgumentParser()

# declare the command params
parser.add_argument('--run_id', default=None, help='The run identifier.', type=str, required=True)
parser.add_argument('--run_dir', default=None, help='The name of the run directory to use for the staging operations.', type=str, required=True)
parser.add_argument('--step_type', default=None, help='The type of staging step, initial or final.', type=str, required=True)
parser.add_argument('--workflow_type', default='CORE', help='The type of workflow, CORE, TOPOLOGY, etc..', type=str, required=False)
Expand All @@ -41,7 +42,7 @@
ret_val: int = 0

# validate the inputs
if args.run_dir != '' and args.step_type != '' and args.workflow_type != '':
if args.run_id != '' and args.run_id != '' and args.step_type != '' and args.workflow_type != '':
try:
# check to make sure we got a legit staging and workflow types
# these will throw exceptions if they are not
Expand All @@ -57,7 +58,7 @@
# should we continue?
if ret_val == 0:
# do the staging
ret_val = stage_obj.run(args.run_dir, args.step_type, args.workflow_type)
ret_val = stage_obj.run(args.run_id, args.run_dir, args.step_type, args.workflow_type)

# exit with the final exit code
sys.exit(ret_val)
26 changes: 10 additions & 16 deletions src/staging/staging.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,47 +45,40 @@ def __init__(self):
# create a DB connection object
self.db_info: PGImplementation = PGImplementation(db_names, _logger=self.logger)

def run(self, run_dir: str, step_type: StagingType, workflow_type: WorkflowTypeName = WorkflowTypeName.CORE) -> int:
def run(self, run_id: str, run_dir: str, step_type: StagingType, workflow_type: WorkflowTypeName = WorkflowTypeName.CORE) -> int:
"""
Performs the requested type of staging operation.
The supervisor will mount the /data directory for this component by default.
:param workflow_type:
:param run_id: The id of the run.
:param run_dir: The base path of the directory to use for the staging operations.
:param step_type: The type of staging step, either 'initial' or 'final'
:param workflow_type:
:return:
"""
# init the return value
ret_val: int = ReturnCodes.EXIT_CODE_SUCCESS

# parse the run id differently in Windows environments
if sys.platform == 'win32':
# get the run ID
run_id: str = run_dir.split('\\')[-1]
else:
# get the run ID
run_id: str = run_dir.split('/')[-1]

# is this an initial stage step?
if step_type == StagingType.INITIAL_STAGING:
# make the call to perform the op
ret_val = self.initial_staging(run_dir, run_id, step_type, workflow_type)
ret_val = self.initial_staging(run_id, run_dir, step_type, workflow_type)
# else this a final stage step
elif step_type == StagingType.FINAL_STAGING:
# make the call to perform the op
ret_val = self.final_staging(run_dir, run_id, step_type)
ret_val = self.final_staging(run_id, run_dir, step_type)

# return to the caller
return ret_val

def initial_staging(self, run_dir: str, run_id: str, staging_type: StagingType, workflow_type: WorkflowTypeName) -> int:
def initial_staging(self, run_id: str, run_dir: str, staging_type: StagingType, workflow_type: WorkflowTypeName) -> int:
"""
Performs the initial staging
:param run_dir: The path of the directory to use for the staging operations.
:param run_id: The ID of the supervisor run request.
:param run_dir: The path of the directory to use for the staging operations.
:param staging_type: The type of staging step, either 'initial' or 'final'
:param workflow_type: the type of workflow
Expand All @@ -94,7 +87,8 @@ def initial_staging(self, run_dir: str, run_id: str, staging_type: StagingType,
# init the return code
ret_val: int = ReturnCodes.EXIT_CODE_SUCCESS

self.logger.info('Initial staging version %s start: run_dir: %s, workflow type: %s', self.app_version, run_dir, workflow_type)
self.logger.info('Initial staging version %s start: run_id: %s, run_dir: %s, workflow type: %s', self.app_version, run_id, run_dir,
workflow_type)

try:
# try to make the call for records
Expand Down Expand Up @@ -230,7 +224,7 @@ def create_test_files(self, run_dir: str, run_data: json, workflow_type: Workflo
# return to the caller
return ret_val

def final_staging(self, run_dir: str, run_id: str, staging_type: StagingType) -> int:
def final_staging(self, run_id: str, run_dir: str, staging_type: StagingType) -> int:
"""
Performs the initial staging
Expand Down
24 changes: 12 additions & 12 deletions src/tests/test_staging.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ def test_run():
run_id: str = '0'

# set up the test directory
run_dir: str = os.path.join(os.getenv('TEST_PATH'), run_id)
run_dir: str = os.path.join(os.getenv('TEST_PATH'), 'grp3')

# make the call to do an initial stage with an invalid run id
ret_val = staging.run(run_dir, StagingType.INITIAL_STAGING, WorkflowTypeName.CORE)
ret_val = staging.run(run_id, run_dir, StagingType.INITIAL_STAGING, WorkflowTypeName.CORE)

# ensure we got a failure code
assert ret_val == ReturnCodes.DB_ERROR
Expand All @@ -49,58 +49,58 @@ def test_run():
run_id: str = '3'

# set up the test directory
run_dir: str = os.path.join(os.getenv('TEST_PATH'), run_id)
run_dir: str = os.path.join(os.getenv('TEST_PATH'), 'grp3')

# make the call to do an initial stage
ret_val = staging.run(run_dir, StagingType.INITIAL_STAGING, WorkflowTypeName.TOPOLOGY)
ret_val = staging.run(run_id, run_dir, StagingType.INITIAL_STAGING, WorkflowTypeName.TOPOLOGY)

# make sure of a successful return code and a json file
assert ret_val == ReturnCodes.EXIT_CODE_SUCCESS and os.path.isfile(os.path.join(run_dir, 'PROVIDER_test_list.sh'))

# make the call to do a final stage. this dir was created above so it should be removed
ret_val = staging.run(run_dir, StagingType.FINAL_STAGING)
ret_val = staging.run(run_id, run_dir, StagingType.FINAL_STAGING)

# make sure of a successful return code and a missing json file
assert ret_val == ReturnCodes.EXIT_CODE_SUCCESS and not os.path.isdir(run_dir)

# make the call to do an initial stage
ret_val = staging.run(run_dir, StagingType.INITIAL_STAGING, WorkflowTypeName.CORE)
ret_val = staging.run(run_id, run_dir, StagingType.INITIAL_STAGING, WorkflowTypeName.CORE)

# make sure of a successful return code and a json file
assert ret_val == ReturnCodes.EXIT_CODE_SUCCESS and os.path.isfile(os.path.join(run_dir, 'PROVIDER_test_list.sh'))

# make the call to do a final stage. this is an invalid directory and should fail
ret_val = staging.run(os.path.join(os.getenv('TEST_PATH'), '0'), StagingType.FINAL_STAGING, WorkflowTypeName.CORE)
ret_val = staging.run(run_id, os.path.join(os.getenv('TEST_PATH'), '0'), StagingType.FINAL_STAGING, WorkflowTypeName.CORE)

# ensure we got a failure code
assert ret_val == ReturnCodes.ERROR_NO_RUN_DIR

# make the call to do a final stage. this dir was created above so it should be removed
ret_val = staging.run(run_dir, StagingType.FINAL_STAGING)
ret_val = staging.run(run_id, run_dir, StagingType.FINAL_STAGING)

# make sure of a successful return code and a missing json file
assert ret_val == ReturnCodes.EXIT_CODE_SUCCESS and not os.path.isdir(run_dir)

# set a valid run ID. however, although this one has an executor specified it has no tests listed
# set a valid run ID. however, although this one has an executor specified it has no tests listed in the DB
run_id: str = '9'

# set up the test directory
run_dir: str = os.path.join(os.getenv('TEST_PATH'), run_id)

# make the call to do an initial stage
ret_val = staging.run(run_dir, StagingType.INITIAL_STAGING, WorkflowTypeName.CORE)
ret_val = staging.run(run_id, run_dir, StagingType.INITIAL_STAGING, WorkflowTypeName.CORE)

# make sure of a successful return code and a json file
assert ret_val == ReturnCodes.EXIT_CODE_SUCCESS and not os.path.isfile(os.path.join(run_dir, 'PROVIDER_test_list.sh'))

# make the call to do a final stage. this dir was created above so it should be removed
ret_val = staging.run(run_dir, StagingType.FINAL_STAGING)
ret_val = staging.run(run_id, run_dir, StagingType.FINAL_STAGING)

# make sure of a successful return code and a missing json file
assert ret_val == ReturnCodes.EXIT_CODE_SUCCESS and not os.path.isdir(run_dir)


#@pytest.mark.skip(reason="Local test only")
@pytest.mark.skip(reason="Local test only")
def test_file_creation():
"""
tests the creation of a file that contains the requested tests
Expand Down

0 comments on commit a9bd38b

Please sign in to comment.