From a9bd38b26b8ebdbc63738102c2c00e3b3b8405a6 Mon Sep 17 00:00:00 2001 From: Phil Owen <19691521+PhillipsOwen@users.noreply.github.com> Date: Mon, 26 Feb 2024 10:28:09 -0500 Subject: [PATCH] adding the run id --- main.py | 5 +++-- src/staging/staging.py | 26 ++++++++++---------------- src/tests/test_staging.py | 24 ++++++++++++------------ 3 files changed, 25 insertions(+), 30 deletions(-) diff --git a/main.py b/main.py index d4d32d0..627e7f9 100644 --- a/main.py +++ b/main.py @@ -30,6 +30,7 @@ parser = ArgumentParser() # declare the command params + parser.add_argument('--run_id', default=None, help='The run identifier.', type=str, required=True) parser.add_argument('--run_dir', default=None, help='The name of the run directory to use for the staging operations.', type=str, required=True) parser.add_argument('--step_type', default=None, help='The type of staging step, initial or final.', type=str, required=True) parser.add_argument('--workflow_type', default='CORE', help='The type of workflow, CORE, TOPOLOGY, etc..', type=str, required=False) @@ -41,7 +42,7 @@ ret_val: int = 0 # validate the inputs - if args.run_dir != '' and args.step_type != '' and args.workflow_type != '': + if args.run_id != '' and args.run_id != '' and args.step_type != '' and args.workflow_type != '': try: # check to make sure we got a legit staging and workflow types # these will throw exceptions if they are not @@ -57,7 +58,7 @@ # should we continue? if ret_val == 0: # do the staging - ret_val = stage_obj.run(args.run_dir, args.step_type, args.workflow_type) + ret_val = stage_obj.run(args.run_id, args.run_dir, args.step_type, args.workflow_type) # exit with the final exit code sys.exit(ret_val) diff --git a/src/staging/staging.py b/src/staging/staging.py index 4421079..b7a3800 100644 --- a/src/staging/staging.py +++ b/src/staging/staging.py @@ -45,47 +45,40 @@ def __init__(self): # create a DB connection object self.db_info: PGImplementation = PGImplementation(db_names, _logger=self.logger) - def run(self, run_dir: str, step_type: StagingType, workflow_type: WorkflowTypeName = WorkflowTypeName.CORE) -> int: + def run(self, run_id: str, run_dir: str, step_type: StagingType, workflow_type: WorkflowTypeName = WorkflowTypeName.CORE) -> int: """ Performs the requested type of staging operation. The supervisor will mount the /data directory for this component by default. - :param workflow_type: + :param run_id: The id of the run. :param run_dir: The base path of the directory to use for the staging operations. :param step_type: The type of staging step, either 'initial' or 'final' + :param workflow_type: :return: """ # init the return value ret_val: int = ReturnCodes.EXIT_CODE_SUCCESS - # parse the run id differently in Windows environments - if sys.platform == 'win32': - # get the run ID - run_id: str = run_dir.split('\\')[-1] - else: - # get the run ID - run_id: str = run_dir.split('/')[-1] - # is this an initial stage step? if step_type == StagingType.INITIAL_STAGING: # make the call to perform the op - ret_val = self.initial_staging(run_dir, run_id, step_type, workflow_type) + ret_val = self.initial_staging(run_id, run_dir, step_type, workflow_type) # else this a final stage step elif step_type == StagingType.FINAL_STAGING: # make the call to perform the op - ret_val = self.final_staging(run_dir, run_id, step_type) + ret_val = self.final_staging(run_id, run_dir, step_type) # return to the caller return ret_val - def initial_staging(self, run_dir: str, run_id: str, staging_type: StagingType, workflow_type: WorkflowTypeName) -> int: + def initial_staging(self, run_id: str, run_dir: str, staging_type: StagingType, workflow_type: WorkflowTypeName) -> int: """ Performs the initial staging - :param run_dir: The path of the directory to use for the staging operations. :param run_id: The ID of the supervisor run request. + :param run_dir: The path of the directory to use for the staging operations. :param staging_type: The type of staging step, either 'initial' or 'final' :param workflow_type: the type of workflow @@ -94,7 +87,8 @@ def initial_staging(self, run_dir: str, run_id: str, staging_type: StagingType, # init the return code ret_val: int = ReturnCodes.EXIT_CODE_SUCCESS - self.logger.info('Initial staging version %s start: run_dir: %s, workflow type: %s', self.app_version, run_dir, workflow_type) + self.logger.info('Initial staging version %s start: run_id: %s, run_dir: %s, workflow type: %s', self.app_version, run_id, run_dir, + workflow_type) try: # try to make the call for records @@ -230,7 +224,7 @@ def create_test_files(self, run_dir: str, run_data: json, workflow_type: Workflo # return to the caller return ret_val - def final_staging(self, run_dir: str, run_id: str, staging_type: StagingType) -> int: + def final_staging(self, run_id: str, run_dir: str, staging_type: StagingType) -> int: """ Performs the initial staging diff --git a/src/tests/test_staging.py b/src/tests/test_staging.py index 3b0296c..49cf09e 100644 --- a/src/tests/test_staging.py +++ b/src/tests/test_staging.py @@ -37,10 +37,10 @@ def test_run(): run_id: str = '0' # set up the test directory - run_dir: str = os.path.join(os.getenv('TEST_PATH'), run_id) + run_dir: str = os.path.join(os.getenv('TEST_PATH'), 'grp3') # make the call to do an initial stage with an invalid run id - ret_val = staging.run(run_dir, StagingType.INITIAL_STAGING, WorkflowTypeName.CORE) + ret_val = staging.run(run_id, run_dir, StagingType.INITIAL_STAGING, WorkflowTypeName.CORE) # ensure we got a failure code assert ret_val == ReturnCodes.DB_ERROR @@ -49,58 +49,58 @@ def test_run(): run_id: str = '3' # set up the test directory - run_dir: str = os.path.join(os.getenv('TEST_PATH'), run_id) + run_dir: str = os.path.join(os.getenv('TEST_PATH'), 'grp3') # make the call to do an initial stage - ret_val = staging.run(run_dir, StagingType.INITIAL_STAGING, WorkflowTypeName.TOPOLOGY) + ret_val = staging.run(run_id, run_dir, StagingType.INITIAL_STAGING, WorkflowTypeName.TOPOLOGY) # make sure of a successful return code and a json file assert ret_val == ReturnCodes.EXIT_CODE_SUCCESS and os.path.isfile(os.path.join(run_dir, 'PROVIDER_test_list.sh')) # make the call to do a final stage. this dir was created above so it should be removed - ret_val = staging.run(run_dir, StagingType.FINAL_STAGING) + ret_val = staging.run(run_id, run_dir, StagingType.FINAL_STAGING) # make sure of a successful return code and a missing json file assert ret_val == ReturnCodes.EXIT_CODE_SUCCESS and not os.path.isdir(run_dir) # make the call to do an initial stage - ret_val = staging.run(run_dir, StagingType.INITIAL_STAGING, WorkflowTypeName.CORE) + ret_val = staging.run(run_id, run_dir, StagingType.INITIAL_STAGING, WorkflowTypeName.CORE) # make sure of a successful return code and a json file assert ret_val == ReturnCodes.EXIT_CODE_SUCCESS and os.path.isfile(os.path.join(run_dir, 'PROVIDER_test_list.sh')) # make the call to do a final stage. this is an invalid directory and should fail - ret_val = staging.run(os.path.join(os.getenv('TEST_PATH'), '0'), StagingType.FINAL_STAGING, WorkflowTypeName.CORE) + ret_val = staging.run(run_id, os.path.join(os.getenv('TEST_PATH'), '0'), StagingType.FINAL_STAGING, WorkflowTypeName.CORE) # ensure we got a failure code assert ret_val == ReturnCodes.ERROR_NO_RUN_DIR # make the call to do a final stage. this dir was created above so it should be removed - ret_val = staging.run(run_dir, StagingType.FINAL_STAGING) + ret_val = staging.run(run_id, run_dir, StagingType.FINAL_STAGING) # make sure of a successful return code and a missing json file assert ret_val == ReturnCodes.EXIT_CODE_SUCCESS and not os.path.isdir(run_dir) - # set a valid run ID. however, although this one has an executor specified it has no tests listed + # set a valid run ID. however, although this one has an executor specified it has no tests listed in the DB run_id: str = '9' # set up the test directory run_dir: str = os.path.join(os.getenv('TEST_PATH'), run_id) # make the call to do an initial stage - ret_val = staging.run(run_dir, StagingType.INITIAL_STAGING, WorkflowTypeName.CORE) + ret_val = staging.run(run_id, run_dir, StagingType.INITIAL_STAGING, WorkflowTypeName.CORE) # make sure of a successful return code and a json file assert ret_val == ReturnCodes.EXIT_CODE_SUCCESS and not os.path.isfile(os.path.join(run_dir, 'PROVIDER_test_list.sh')) # make the call to do a final stage. this dir was created above so it should be removed - ret_val = staging.run(run_dir, StagingType.FINAL_STAGING) + ret_val = staging.run(run_id, run_dir, StagingType.FINAL_STAGING) # make sure of a successful return code and a missing json file assert ret_val == ReturnCodes.EXIT_CODE_SUCCESS and not os.path.isdir(run_dir) -#@pytest.mark.skip(reason="Local test only") +@pytest.mark.skip(reason="Local test only") def test_file_creation(): """ tests the creation of a file that contains the requested tests