Skip to content

Commit

Permalink
refactor(workflow): improve validators
Browse files Browse the repository at this point in the history
This commit adds a number of new validators and fixes the bug with finding workflow artifacts.

resolves #19, resolves #11
  • Loading branch information
mostaphaRoudsari committed Jan 18, 2020
1 parent 1be96ba commit ef98150
Show file tree
Hide file tree
Showing 4 changed files with 265 additions and 233 deletions.
10 changes: 10 additions & 0 deletions queenbee/schema/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,3 +104,13 @@ class DAG(BaseModel):
...,
description='Tasks are a list of DAG steps'
)

@property
def artifacts(self):
"""List of unique DAG artifacts."""
artifacts = []
for dag_task in self.tasks:
if not dag_task.arguments:
continue
artifacts.append(dag_task.arguments.artifacts)
return list(artifacts)
13 changes: 13 additions & 0 deletions queenbee/schema/function.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,3 +196,16 @@ def validate_variable(variables, func_name, input_names):
'Illegal output parameter name in "%s": {{%s}}\n' \
'Valid inputs:\n\t- %s' % (func_name,
m, '\n\t- '.join(input_names))

@property
def artifacts(self):
"""List of workflow artifacts."""
artifacts = []

if self.inputs and self.inputs.artifacts:
artifacts.extend(self.inputs.artifacts)

if self.outputs and self.outputs.artifacts:
artifacts.extend(self.outputs.artifacts)

return list(artifacts)
132 changes: 85 additions & 47 deletions queenbee/schema/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
end steps for the workflow.
"""
from uuid import UUID, uuid4
import collections
import json
import os
import re
Expand Down Expand Up @@ -54,46 +55,84 @@ class Workflow(BaseModel):

@validator('artifact_locations', each_item=False)
def check_duplicate_run_folders(cls, v):
count = 0
print(v)
for location in v:
if location.type == 'run-folder':
count += 1
assert count <= 1, "Workflow can only have 1 run-folder artifact location"
count = sum(1 if location.type == 'run-folder' else 0 for location in v)
assert count <= 1, \
f'Workflow can only have 1 run-folder artifact location. Found {count}'
return v

@validator('templates', each_item=False)
def check_duplicate_template_name(cls, v):
template_names = [t.name for t in v]
u_template_names = list(set(template_names))
if len(template_names) != len(u_template_names):
# there are duplicate items
dup = [t for t, c in collections.Counter(template_names).items() if c > 1]
raise ValueError(
'Each template name must be unique.'
f' Duplicate template names: {dup}'
)
return v

@validator('flow')
def check_templates(cls, v, values):
"""Check template references in flow to ensure they exist."""
template_names = set(t.name for t in values['templates'])
invalid_names = [
task.template for task in v.tasks
if task.template not in template_names
]
if invalid_names:
raise ValueError(f'Found invalid template names in flow: {invalid_names}')
return v

@validator('flow')
def check_dependencies(cls, v):
"""Check dependency references in flow to ensure they exist."""
task_names = set(t.name for t in v.tasks)
invalid_names = []
for task in v.tasks:
dependencies = task.dependencies
if not dependencies:
continue
for dependency in dependencies:
if dependency not in task_names:
invalid_names.append(dependency)
if invalid_names:
raise ValueError(
f'Invalid dependency names in flow tasks: {invalid_names}')
return v

def validate_all(self):
"""Check that all elements of the workflow are valid together"""
self.check_references_exist()
"""Check that all elements of the workflow are valid together."""
self.check_artifact_references()

# TODO: add validate all to DAG
for template in self.templates:
if template.type == 'function':
template.validate_all()

def check_references_exist(self):
"""Check that any artifact location referenced in templates or flows exists in artifact_locations"""
def check_artifact_references(self):
"""Check artifact references.
This method check weather any artifact location that is referenced in templates
or flows exists in artifact_locations.
"""
values = self.dict()
v = values.get('artifact_locations')
if v != None:
locations = [x.get('name') for x in v]
artifacts = list_artifacts(values)
sources = list(set([x.get('location') for x in artifacts]))
artifacts = self.artifacts
sources = set(artifact.location for artifact in artifacts)
for source in sources:
if source not in locations:
raise ValueError(
"Artifact with location \"{}\" is not valid because it is not listed in the artifact_locations object.".format(source))
'Artifact with location \"{}\" is not valid because it is not'
' listed in the artifact_locations object.'.format(
source)
)

return values

# TODO: add a validator to ensure all the names for templates are unique
# @validator('flow')
# def check_templates(cls, v, values):
# """Check templates in flow to ensure they exist."""
# template_names = set(t.name for t in values['templates'])
# for task in v.tasks:
# if task.template not in template_names:
# raise ValueError('{} is not a valid template.'.format(task.template))

def to_diagraph(self, filename=None):
"""Return a graphviz instance of a diagraph from workflow"""
if filename is None:
Expand Down Expand Up @@ -145,6 +184,30 @@ def nodes_links(self):

return {'nodes': nodes, 'links': links}

@property
def artifacts(self):
"""List of workflow artifacts."""
artifacts = []
# collect all artifacts
for template in self.templates:
artifacts.extend(template.artifacts)

for task in self.flow.tasks:
args = task.arguments
if not args:
continue
if not args.artifacts:
continue
artifacts.extend(args.artifacts)

if self.inputs and self.inputs.artifacts:
artifacts.extend(self.inputs.artifacts)

if self.outputs and self.outputs.artifacts:
artifacts.extend(self.outputs.artifacts)

return list(artifacts)


def hydrate_templates(workflow, wf_value=None):
"""Replace all `{{ workflow.x.y.z }}` with corresponding value
Expand Down Expand Up @@ -200,31 +263,6 @@ def hydrate_templates(workflow, wf_value=None):
return wf_value


def list_artifacts(obj):
artifacts = []

if 'flow' in obj:
for dag in obj['flow']:
artifacts += list_artifacts(dag)

if 'templates' in obj:
for template in obj['templates']:
artifacts += list_artifacts(template)

if isinstance(obj, DAG):
for task in obj.tasks:
if 'arguments' in task and 'artifacts' in task.arguments:
artifacts = artifacts + task.arguments.artifacts

if isinstance(obj, Function):
if obj.inputs != None and obj.inputs.artifacts != None:
artifacts = artifacts + obj.inputs.artifacts
if obj.outputs != None and obj.outputs.artifacts != None:
artifacts = artifacts + obj.outputs.artifacts

return artifacts


# required for self.referencing model
# see https://pydantic-docs.helpmanual.io/#self-referencing-models
Workflow.update_forward_refs()
Loading

0 comments on commit ef98150

Please sign in to comment.