Skip to content

Commit

Permalink
perf(variable): improve variable access and validation
Browse files Browse the repository at this point in the history
- Added new methods to Argument, Parameter and Artifact classes to make it easy to access and
validate the input values
- Moved input validation to Arguments level so we can validate them
before adding them to the workflow.
- Added new tests for Parameter.
- Added a new module for
queenbee variables.

This commit partially breaks workflow hydrate which I will fix in the next
commit.

Resolve #12
  • Loading branch information
mostaphaRoudsari committed Jan 18, 2020
1 parent ef98150 commit 0cd3e8a
Show file tree
Hide file tree
Showing 15 changed files with 470 additions and 128 deletions.
66 changes: 46 additions & 20 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -247,8 +247,6 @@ and how the output from one task will be consumed by another task(s). Such a wor
also known as Directed Acyclic Graph (DAG).

Here is an example of creating a sky and generating an octree as two consecutive steps.
Note that the values are place-holders and can be overwritten by input parameters file.


```yaml
Expand All @@ -267,24 +265,13 @@ flow:
inputs:
parameters:
- name: scene_files
value:
- "{{steps.generate_sky_steps.outputs.sky_file}}"
- "{{workflow.inputs.parameters.folder}}"/model/static/scene.mat
- "{{workflow.inputs.parameters.folder}}"/model/static/scene.rad
value: |
"{{tasks.generate_sky_steps.outputs.sky_file}}"
"{{workflow.inputs.parameters.folder}}"/model/static/scene.mat
"{{workflow.inputs.parameters.folder}}"/model/static/scene.rad
```

As you can see it is common to use the output of one step as an input for another step or
reference one of the workflow inputs as an input for one of the steps or tasks. Queenbee
supports the following words as prefix variable names inside the `flow` section:

- workflow: "{{workflow.xx.yy}} is used for workflow level parameters.
- tasks: "{{tasks.task_name.xx.yy}} is used in DAG task to refer to other tasks.
- inputs: "{{inputs.xx.yy}}" is used in operators.
- item: "{{item}}" or "{{item.key_name}}" is used in loops. You can change item to a
different keyword by setting up `loop_var` in `loop_control`.


Now let's think about a longer workflow which also includes ray-tracing using the
generated octree. We need to add two new steps to the workflow:

Expand All @@ -298,8 +285,7 @@ generating the sensor grids we we do not need to wait for generating sky to be f
Finally, the last step of ray-tracing will need both the grid and the octree.

To describe such flows we will use a Directed Acyclic Graph or DAG. Here
is the updated process. Note the the keyword `step` is changed to `tasks` and each `task`
has a key for `dependency`.
is the updated process.

Also since the step for generating grids can generate more than one grid we are using
loop to run ray-tracing for all these grids in parallel.
Expand All @@ -325,7 +311,7 @@ loop to run ray-tracing for all these grids in parallel.
```yaml
flow:
- name: jjjjj
- name: sample-workflow
- tasks:
- name: generate_sky_task
template: generate_sky
Expand Down Expand Up @@ -374,6 +360,46 @@ Outputs can also return `parameters` that are generated in the `process` section
workflow.


## Variables

As you can see it is common to use the output of one task as an input for another task or
reference one of the workflow inputs as an input for one of the steps or tasks. Queenbee
supports the following words as prefix variable names:

### Global variables

| Variable | Description|
|----------|------------|
| `workflow.name` | Replaced by workflow name |
| `workflow.id` | Replaced by workflow id |
| `workflow.inputs.parameters.<NAME>` | Replaced by `value` for parameter `<NAME>` |
| `workflow.inputs.artifacts.<NAME>` | Replaced by `path` for artifact `<NAME>` |
| `workflow.operators.<OPERATORNAME>.image` | Replaced by image name for operator `<OPERATORNAME>` |

### Flow variables

| Variable | Description|
|----------|------------|
| `tasks.<TASKNAME>.outputs.parameters.<NAME>` | Output parameter of any previous task |
| `tasks.<TASKNAME>.outputs.artifacts.<NAME>` | Output artifact of any previous task |

### Task variables

| Variable | Description|
|----------|------------|
| `inputs.parameters.<NAME>` | Input parameter of task `<NAME>`|
| `inputs.artifacts.<NAME>` | Input artifact of task `<NAME>`|
| `outputs.parameters.<NAME>` | Output parameter of task `<NAME>`|
| `outputs.artifacts.<NAME>` | Output artifact of task `<NAME>`|

### Loops

| Variable | Description|
|----------|------------|
| `item` | Replaced by the value of item |
| `item.<FIELDNAME>` | Replaced by the field value of the item |


# TODO: Command Line Interface

You can also use queenbee from command line. The most commonly used commands are:
Expand Down
149 changes: 140 additions & 9 deletions queenbee/schema/arguments.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@
2. Artifact: An ``artifact`` is a file or folder that can be identified by a url or
a path.
"""
from queenbee.schema.qutil import BaseModel
from queenbee.schema.qutil import BaseModel, find_dup_items
from queenbee.schema.artifact_location import VerbEnum
from pydantic import Field
import queenbee.schema.variable as qbvar
from pydantic import Field, root_validator
from typing import List, Any, Optional, Dict


Expand Down Expand Up @@ -40,22 +41,69 @@ class Parameter(BaseModel):

path: str = Field(
None,
description='load parameters from a file. File can be a JSON / YAML or a text file.'
description='Load parameters values from a JSON file.'
)

@root_validator
def validate_vars(cls, values):
"""Validate input values."""
name = values.get('name')
value = values.get('value')
path = values.get('path')
if value and path:
raise ValueError(
f'You should either set value or path for parameter {name}.'
)

value = value if value is not None else path
if not value or isinstance(value, (int, float)):
return values
# check if it is a referenced variable
ref_var = qbvar.get_ref_variable(value)

if ref_var:
for rv in ref_var:
qbvar.validate_ref_variable_format(rv)

return values

@property
def current_value(self):
"""Try to get the current value.
This method checks the ``value`` property first and if it is None it will return
the value for ``path``.
"""
return self.value if self.value is not None else self.path

@property
def ref_vars(self) -> Dict[str, List[str]]:
"""Get referenced variables if any.
"""
value = self.current_value
if not value:
return {}

ref_var = qbvar.get_ref_variable(value)
if not ref_var:
return {}

return {'value': ref_var} if self.value is not None else {'path': ref_var}


class Artifact(BaseModel):
"""Artifact indicates an artifact to place at a specified path"""
"""Artifact indicates an artifact to be placed at a specified path."""

name: str = Field(
...,
description='name of the artifact. must be unique within a task\'s '
description='Name of the artifact. Must be unique within a task\'s '
'inputs / outputs.'
)

location: str = Field(
None,
description="Name of the Artifact Location to source this artifact from."
None, # is it possible to create an artifact with no artifact location?
description="Name of the artifact_location to source this artifact from."
)

source_path: str = Field(
Expand All @@ -75,14 +123,65 @@ class Artifact(BaseModel):

headers: Optional[Dict[str, str]] = Field(
None,
description="An object with Key Value pairs of HTTP headers. For artifacts from URL Location only"
description='An object with Key Value pairs of HTTP headers. '
'For artifacts from URL location only.'
)

verb: Optional[VerbEnum] = Field(
None,
description="The HTTP verb to use when making the request. For artifacts from URL Location only"
description='The HTTP verb to use when making the request. '
'For artifacts from URL location only.'
)

@root_validator
def validate_vars(cls, values):
"""Validate input values."""
input_values = [
v for v in (values.get('location'), values.get('path'),
values.get('source_path')) if v is not None
]

if not input_values:
return values

for value in values:
# check if it is a referenced variable
ref_var = qbvar.get_ref_variable(value)
if not ref_var:
continue
for rv in ref_var:
qbvar.validate_ref_variable_format(rv)

return values

@property
def current_value(self):
"""Try to get the current value.
This method checks the ``path`` property first and if it is None it will return
the value for ``source_path``.
"""
return self.path if self.path is not None else self.source_path

@property
def ref_vars(self) -> Dict[str, List[str]]:
"""Get referenced variables if any."""
ref_values = {}
values = [
v for v in (self.location, self.path, self.source_path)
if v is not None
]

if not values:
return ref_values

for value in values:
ref_var = qbvar.get_ref_variable(value)
if ref_var:
ref_values[value] = ref_var

return ref_values


class Arguments(BaseModel):
"""Arguments to a task or a workflow.
Expand All @@ -104,3 +203,35 @@ class Arguments(BaseModel):
description='Artifacts is the list of file and folder arguments to pass to the '
'task or workflow.'
)

@root_validator
def unique_names(cls, values):
params = values.get('parameters')
if params:
param_names = [par.name for par in params]
if len(param_names) != len(set(param_names)):
dup = find_dup_items(param_names)
raise ValueError(f'Duplicate parameter names: {dup}')
artifacts = values.get('artifacts')
if artifacts:
artifact_names = [par.name for par in artifacts]
if len(artifact_names) != len(set(artifact_names)):
dup = find_dup_items(artifact_names)
raise ValueError(f'Duplicate artifact names: {dup}')
return values

def get_parameter_value(self, name):
"""Get a parameter value by name."""
param = [par for par in self.parameters if par.name == name]
if not param:
raise ValueError(f'Invalid parameter name: {name}')
return param[0].current_value

def get_artifact_value(self, name):
"""Get an artifact value by name."""
if not self.artifacts:
raise ValueError('Arguments has no artifacts')
param = [par for par in self.artifacts if par.name == name]
if not param:
raise ValueError(f'Invalid artifact name: {name}')
return param[0].current_value
4 changes: 2 additions & 2 deletions queenbee/schema/artifact_location.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class VerbEnum(str, Enum):
class ArtifactLocation(BaseModel):
"""ArtifactLocation
An Artifact Location System
An Artifact Location System.
"""

name: str = Field(
Expand All @@ -35,7 +35,7 @@ class ArtifactLocation(BaseModel):

root: str = Field(
...,
description="The root path to the artifacts."
description='The root path to the artifacts.'
)


Expand Down
30 changes: 0 additions & 30 deletions queenbee/schema/function.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,36 +54,6 @@ class Function(BaseModel):
description='List of output arguments.'
)

@validator('inputs')
def check_workflow_reference(cls, v):
if v == None:
return v

input_params = v.parameters

if input_params == None:
return v

ref_params = []

for param in input_params:
if not isinstance(param, (str, bytes)):
continue
if not param.value:
continue
if 'workflow.' in param.value:
ref_params.append(param)
if len(ref_params) > 0:
params = ['{}: {}'.format(param.name, param.value)
for param in ref_params]
warnings.warn(
'Referencing workflow parameters in a template function makes the'
' function less reusable. Try using inputs / outputs of the function'
' instead and assign workflow values in flow section when calling'
' this function.\n\t- {}'.format('\n\t-'.join(params))
)
return v

def validate_all(self):
"""Check that all the elements of the function are valid together"""
self.check_command_referenced_values()
Expand Down
4 changes: 2 additions & 2 deletions queenbee/schema/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,12 @@ def _import_dict_data(dictionary, folder):


def parse_file(input_file):
"""Parse queenbee objects from an input JSON / YAML file.
"""Parse queenbee objects from an input JSON or YAML file.
This method will replace 'import_from' keys with the content from files recursively.
Args:
input_file: A YAML / JSON input file.
input_file: A YAML or JSON input file.
Returns:
The content of the input file as a dictionary.
Expand Down
12 changes: 9 additions & 3 deletions queenbee/schema/qutil.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
"""Queenbee utility methods."""
"""Queenbee utility functions."""
from pydantic import BaseModel as PydanticBaseModel
from .parser import parse_file
import yaml
import json
import collections
from typing import List

# set up yaml.dump to keep the order of the input dictionary
# from https://stackoverflow.com/a/31609484/4394669


def _keep_name_order_in_yaml():
represent_dict_order = \
lambda self, data: self.represent_mapping(
Expand Down Expand Up @@ -58,3 +58,9 @@ def from_file(cls, filepath):

def __repr__(self):
return self.yaml()


def find_dup_items(values: List) -> List:
"""Find duplicate items in a list."""
dup = [t for t, c in collections.Counter(values).items() if c > 1]
return dup
Loading

0 comments on commit 0cd3e8a

Please sign in to comment.