From f5021bb07bb8638aef9164cc64e742dde4c7fe76 Mon Sep 17 00:00:00 2001 From: Kai Waldrant Date: Fri, 20 Sep 2024 16:52:05 +0200 Subject: [PATCH] Update workflow dependencies (#20) * Update workflow dependencies * Update process_datasets workflow * update run becnhmark workflow * Update changelog * refactor run benchmark workflow --- CHANGELOG.md | 2 + _viash.yaml | 5 +- .../process_datasets/config.vsh.yaml | 4 +- src/workflows/process_datasets/main.nf | 121 +----------- src/workflows/run_benchmark/config.vsh.yaml | 6 +- src/workflows/run_benchmark/main.nf | 176 +++--------------- 6 files changed, 40 insertions(+), 274 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6cd7577..c9cb64b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -36,6 +36,8 @@ * Update test_resources path in components (PR #18). +* Update workflows to use core repository dependency (PR #20). + ## BUG FIXES * Update the nextflow workflow dependencies (PR #17). diff --git a/_viash.yaml b/_viash.yaml index a31e89c..1600fd7 100644 --- a/_viash.yaml +++ b/_viash.yaml @@ -72,10 +72,11 @@ authors: orcid: "0009-0003-8555-1361" repositories: - - name: openproblems + - name: core type: github - repo: openproblems-bio/openproblems + repo: openproblems-bio/core tag: build/main + path: viash/core viash_version: 0.9.0 diff --git a/src/workflows/process_datasets/config.vsh.yaml b/src/workflows/process_datasets/config.vsh.yaml index 3d283dd..7d98b4b 100644 --- a/src/workflows/process_datasets/config.vsh.yaml +++ b/src/workflows/process_datasets/config.vsh.yaml @@ -23,8 +23,8 @@ resources: entrypoint: run_wf - path: /common/nextflow_helpers/helper.nf dependencies: - - name: common/check_dataset_schema - repository: openproblems + - name: schema/verify_data_structure + repository: core - name: data_processors/process_dataset runners: - type: nextflow diff --git a/src/workflows/process_datasets/main.nf b/src/workflows/process_datasets/main.nf index 7a3f952..8b21d78 100644 --- a/src/workflows/process_datasets/main.nf +++ b/src/workflows/process_datasets/main.nf @@ -1,7 +1,7 @@ include { findArgumentSchema } from "${meta.resources_dir}/helper.nf" workflow auto { - findStatesTemp(params, meta.config) + findStates(params, meta.config) | meta.workflow.run( auto: [publish: "state"] ) @@ -14,7 +14,7 @@ workflow run_wf { main: output_ch = input_ch - | check_dataset_schema.run( + | verify_data_structure.run( fromState: { id, state -> def schema = findArgumentSchema(meta.config, "input") def schemaYaml = tempFile("schema.yaml") @@ -52,120 +52,3 @@ workflow run_wf { emit: output_ch } - -// temp fix for rename_keys typo - -def findStatesTemp(Map params, Map config) { - def auto_config = deepClone(config) - def auto_params = deepClone(params) - - auto_config = auto_config.clone() - // override arguments - auto_config.argument_groups = [] - auto_config.arguments = [ - [ - type: "string", - name: "--id", - description: "A dummy identifier", - required: false - ], - [ - type: "file", - name: "--input_states", - example: "/path/to/input/directory/**/state.yaml", - description: "Path to input directory containing the datasets to be integrated.", - required: true, - multiple: true, - multiple_sep: ";" - ], - [ - type: "string", - name: "--filter", - example: "foo/.*/state.yaml", - description: "Regex to filter state files by path.", - required: false - ], - // to do: make this a yaml blob? - [ - type: "string", - name: "--rename_keys", - example: ["newKey1:oldKey1", "newKey2:oldKey2"], - description: "Rename keys in the detected input files. This is useful if the input files do not match the set of input arguments of the workflow.", - required: false, - multiple: true, - multiple_sep: ";" - ], - [ - type: "string", - name: "--settings", - example: '{"output_dataset": "dataset.h5ad", "k": 10}', - description: "Global arguments as a JSON glob to be passed to all components.", - required: false - ] - ] - if (!(auto_params.containsKey("id"))) { - auto_params["id"] = "auto" - } - - // run auto config through processConfig once more - auto_config = processConfig(auto_config) - - workflow findStatesTempWf { - helpMessage(auto_config) - - output_ch = - channelFromParams(auto_params, auto_config) - | flatMap { autoId, args -> - - def globalSettings = args.settings ? readYamlBlob(args.settings) : [:] - - // look for state files in input dir - def stateFiles = args.input_states - - // filter state files by regex - if (args.filter) { - stateFiles = stateFiles.findAll{ stateFile -> - def stateFileStr = stateFile.toString() - def matcher = stateFileStr =~ args.filter - matcher.matches()} - } - - // read in states - def states = stateFiles.collect { stateFile -> - def state_ = readTaggedYaml(stateFile) - [state_.id, state_] - } - - // construct renameMap - if (args.rename_keys) { - def renameMap = args.rename_keys.collectEntries{renameString -> - def split = renameString.split(":") - assert split.size() == 2: "Argument 'rename_keys' should be of the form 'newKey:oldKey,newKey:oldKey'" - split - } - - // rename keys in state, only let states through which have all keys - // also add global settings - states = states.collectMany{id, state -> - def newState = [:] - - for (key in renameMap.keySet()) { - def origKey = renameMap[key] - if (!(state.containsKey(origKey))) { - return [] - } - newState[key] = state[origKey] - } - - [[id, globalSettings + newState]] - } - } - - states - } - emit: - output_ch - } - - return findStatesTempWf -} \ No newline at end of file diff --git a/src/workflows/run_benchmark/config.vsh.yaml b/src/workflows/run_benchmark/config.vsh.yaml index 1d408ee..e162544 100644 --- a/src/workflows/run_benchmark/config.vsh.yaml +++ b/src/workflows/run_benchmark/config.vsh.yaml @@ -52,10 +52,8 @@ resources: - type: file path: /_viash.yaml dependencies: - - name: common/check_dataset_schema - repository: openproblems - - name: common/extract_metadata - repository: openproblems + - name: h5ad/extract_uns_metadata + repository: core - name: control_methods/no_denoising - name: control_methods/perfect_denoising - name: methods/alra diff --git a/src/workflows/run_benchmark/main.nf b/src/workflows/run_benchmark/main.nf index 14d2494..fe8defb 100644 --- a/src/workflows/run_benchmark/main.nf +++ b/src/workflows/run_benchmark/main.nf @@ -1,5 +1,5 @@ workflow auto { - findStatesTemp(params, meta.config) + findStates(params, meta.config) | meta.workflow.run( auto: [publish: "state"] ) @@ -37,7 +37,7 @@ workflow run_wf { } // extract the dataset metadata - | extract_metadata.run( + | extract_uns_metadata.run( fromState: [input: "input_test"], toState: { id, output, state -> state + [ @@ -100,6 +100,27 @@ workflow run_wf { } ) + // extract the scores + | extract_uns_metadata.run( + key: "extract_scores", + fromState: [input: "metric_output"], + toState: { id, output, state -> + state + [ + score_uns: readYaml(output.output).uns + ] + } + ) + + | joinStates { ids, states -> + // store the scores in a file + def score_uns = states.collect{it.score_uns} + def score_uns_yaml_blob = toYamlBlob(score_uns) + def score_uns_file = tempFile("score_uns.yaml") + score_uns_file.write(score_uns_yaml_blob) + + ["output", [output_scores: score_uns_file]] + } + /****************************** * GENERATE OUTPUT YAML FILES * ******************************/ @@ -108,7 +129,7 @@ workflow run_wf { // so code related to normalization_ids is commented out // extract the dataset metadata - dataset_meta_ch = dataset_ch + meta_ch = dataset_ch // // only keep one of the normalization methods // | filter{ id, state -> // state.dataset_uns.normalization_id == "log_cp10k" @@ -124,23 +145,6 @@ workflow run_wf { def dataset_uns_file = tempFile("dataset_uns.yaml") dataset_uns_file.write(dataset_uns_yaml_blob) - ["output", [output_dataset_info: dataset_uns_file]] - } - - output_ch = score_ch - - // extract the scores - | extract_metadata.run( - key: "extract_scores", - fromState: [input: "metric_output"], - toState: { id, output, state -> - state + [ - score_uns: readYaml(output.output).uns - ] - } - ) - - | joinStates { ids, states -> // store the method configs in a file def method_configs = methods.collect{it.config} def method_configs_yaml_blob = toYamlBlob(method_configs) @@ -155,25 +159,21 @@ workflow run_wf { def task_info_file = meta.resources_dir.resolve("_viash.yaml") - // store the scores in a file - def score_uns = states.collect{it.score_uns} - def score_uns_yaml_blob = toYamlBlob(score_uns) - def score_uns_file = tempFile("score_uns.yaml") - score_uns_file.write(score_uns_yaml_blob) - + // create output def new_state = [ output_method_configs: method_configs_file, output_metric_configs: metric_configs_file, output_task_info: task_info_file, - output_scores: score_uns_file, + output_dataset_info: dataset_uns_file, _meta: states[0]._meta ] ["output", new_state] } - // merge all of the output data - | mix(dataset_meta_ch) + // merge all of the output data + output_ch = score_ch + | mix(meta_ch) | joinStates{ ids, states -> def mergedStates = states.inject([:]) { acc, m -> acc + m } [ids[0], mergedStates] @@ -182,121 +182,3 @@ workflow run_wf { emit: output_ch } - - -// temp fix for rename_keys typo - -def findStatesTemp(Map params, Map config) { - def auto_config = deepClone(config) - def auto_params = deepClone(params) - - auto_config = auto_config.clone() - // override arguments - auto_config.argument_groups = [] - auto_config.arguments = [ - [ - type: "string", - name: "--id", - description: "A dummy identifier", - required: false - ], - [ - type: "file", - name: "--input_states", - example: "/path/to/input/directory/**/state.yaml", - description: "Path to input directory containing the datasets to be integrated.", - required: true, - multiple: true, - multiple_sep: ";" - ], - [ - type: "string", - name: "--filter", - example: "foo/.*/state.yaml", - description: "Regex to filter state files by path.", - required: false - ], - // to do: make this a yaml blob? - [ - type: "string", - name: "--rename_keys", - example: ["newKey1:oldKey1", "newKey2:oldKey2"], - description: "Rename keys in the detected input files. This is useful if the input files do not match the set of input arguments of the workflow.", - required: false, - multiple: true, - multiple_sep: ";" - ], - [ - type: "string", - name: "--settings", - example: '{"output_dataset": "dataset.h5ad", "k": 10}', - description: "Global arguments as a JSON glob to be passed to all components.", - required: false - ] - ] - if (!(auto_params.containsKey("id"))) { - auto_params["id"] = "auto" - } - - // run auto config through processConfig once more - auto_config = processConfig(auto_config) - - workflow findStatesTempWf { - helpMessage(auto_config) - - output_ch = - channelFromParams(auto_params, auto_config) - | flatMap { autoId, args -> - - def globalSettings = args.settings ? readYamlBlob(args.settings) : [:] - - // look for state files in input dir - def stateFiles = args.input_states - - // filter state files by regex - if (args.filter) { - stateFiles = stateFiles.findAll{ stateFile -> - def stateFileStr = stateFile.toString() - def matcher = stateFileStr =~ args.filter - matcher.matches()} - } - - // read in states - def states = stateFiles.collect { stateFile -> - def state_ = readTaggedYaml(stateFile) - [state_.id, state_] - } - - // construct renameMap - if (args.rename_keys) { - def renameMap = args.rename_keys.collectEntries{renameString -> - def split = renameString.split(":") - assert split.size() == 2: "Argument 'rename_keys' should be of the form 'newKey:oldKey;newKey:oldKey'" - split - } - - // rename keys in state, only let states through which have all keys - // also add global settings - states = states.collectMany{id, state -> - def newState = [:] - - for (key in renameMap.keySet()) { - def origKey = renameMap[key] - if (!(state.containsKey(origKey))) { - return [] - } - newState[key] = state[origKey] - } - - [[id, globalSettings + newState]] - } - } - - states - } - emit: - output_ch - } - - return findStatesTempWf -} \ No newline at end of file