Skip to content

Commit

Permalink
implement ConstraintNodeSpec
Browse files Browse the repository at this point in the history
Signed-off-by: Jorge Aguilera <[email protected]>
  • Loading branch information
jagedn committed Jul 14, 2024
1 parent c46f183 commit d63c50f
Show file tree
Hide file tree
Showing 12 changed files with 459 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package nextflow.nomad.config

class ConstraintNodeSpec {

private String id = null
private String name = null
private String clientClass = null
private String pool = null
private String dataCenter = null
private String region = null

String getId() {
return id
}

String getName() {
return name
}

String getClientClass() {
return clientClass
}

String getPool() {
return pool
}

String getDataCenter() {
return dataCenter
}

String getRegion() {
return region
}

ConstraintNodeSpec setUnique(Map map){
unique(map)
}

ConstraintNodeSpec unique(Map map){
this.id = map.containsKey("id") ? map["id"].toString() : null
this.name = map.containsKey("name") ? map["name"].toString() : null
this
}

ConstraintNodeSpec setClientClass(Object map){
clientClass(map)
}

ConstraintNodeSpec clientClass(Object clientClass){
this.clientClass = clientClass.toString()
this
}

ConstraintNodeSpec setPool(Object map){
pool(map)
}

ConstraintNodeSpec pool(Object pool){
this.pool = pool.toString()
this
}

ConstraintNodeSpec setDataCenter(Object map){
dataCenter(map)
}

ConstraintNodeSpec dataCenter(Object dataCenter){
this.dataCenter = dataCenter.toString()
this
}

ConstraintNodeSpec setRegion(Object map){
region(map)
}

ConstraintNodeSpec region(Object region){
this.region = region.toString()
this
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package nextflow.nomad.config

class ConstraintsSpec {

List<ConstraintNodeSpec> nodeSpecs = []

ConstraintsSpec node( @DelegatesTo(ConstraintNodeSpec)Closure closure){
ConstraintNodeSpec constraintNodeSpec = new ConstraintNodeSpec()
def clone = closure.rehydrate(constraintNodeSpec, closure.owner, closure.thisObject)
clone.resolveStrategy = Closure.DELEGATE_FIRST
clone()
nodeSpecs << constraintNodeSpec
this
}

void validate(){

}

static ConstraintsSpec parse(@DelegatesTo(ConstraintsSpec)Closure closure){
ConstraintsSpec constraintsSpec = new ConstraintsSpec()
def clone = closure.rehydrate(constraintsSpec, closure.owner, closure.thisObject)
clone.resolveStrategy = Closure.DELEGATE_FIRST
clone()
constraintsSpec.validate()
constraintsSpec
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ class NomadJobOpts{
AffinitySpec affinitySpec
ConstraintSpec constraintSpec

ConstraintsSpec constraintsSpec

NomadJobOpts(Map nomadJobOpts, Map<String,String> env=null){
assert nomadJobOpts!=null

Expand Down Expand Up @@ -69,6 +71,7 @@ class NomadJobOpts{
this.volumeSpec = parseVolumes(nomadJobOpts)
this.affinitySpec = parseAffinity(nomadJobOpts)
this.constraintSpec = parseConstraint(nomadJobOpts)
this.constraintsSpec = parseConstraints(nomadJobOpts)
}

VolumeSpec[] parseVolumes(Map nomadJobOpts){
Expand Down Expand Up @@ -110,6 +113,7 @@ class NomadJobOpts{

AffinitySpec parseAffinity(Map nomadJobOpts) {
if (nomadJobOpts.affinity && nomadJobOpts.affinity instanceof Closure) {
log.info "affinity config will be deprecated, use affinities closure instead"
def affinitySpec = new AffinitySpec()
def closure = (nomadJobOpts.affinity as Closure)
def clone = closure.rehydrate(affinitySpec, closure.owner, closure.thisObject)
Expand All @@ -124,6 +128,7 @@ class NomadJobOpts{

ConstraintSpec parseConstraint(Map nomadJobOpts){
if (nomadJobOpts.constraint && nomadJobOpts.constraint instanceof Closure) {
log.info "constraint config will be deprecated, use constraints closure instead"
def constraintSpec = new ConstraintSpec()
def closure = (nomadJobOpts.constraint as Closure)
def clone = closure.rehydrate(constraintSpec, closure.owner, closure.thisObject)
Expand All @@ -135,4 +140,18 @@ class NomadJobOpts{
null
}
}

ConstraintsSpec parseConstraints(Map nomadJobOpts){
if (nomadJobOpts.constraints && nomadJobOpts.constraints instanceof Closure) {
def constraintsSpec = new ConstraintsSpec()
def closure = (nomadJobOpts.constraints as Closure)
def clone = closure.rehydrate(constraintsSpec, closure.owner, closure.thisObject)
clone.resolveStrategy = Closure.DELEGATE_FIRST
clone()
constraintsSpec.validate()
constraintsSpec
}else{
null
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import groovy.util.logging.Slf4j
import io.nomadproject.client.ApiClient
import io.nomadproject.client.api.JobsApi
import io.nomadproject.client.model.*
import nextflow.nomad.config.ConstraintNodeSpec
import nextflow.nomad.config.ConstraintsSpec
import nextflow.nomad.config.NomadConfig
import nextflow.nomad.config.VolumeSpec
import nextflow.processor.TaskRun
Expand Down Expand Up @@ -182,7 +184,8 @@ class NomadService implements Closeable{

volumes(task, taskDef, workingDir)
affinity(task, taskDef)
constrains(task, taskDef)
constrain(task, taskDef)
constraints(task, taskDef)

return taskDef
}
Expand Down Expand Up @@ -233,7 +236,7 @@ class NomadService implements Closeable{
taskDef
}

protected Task constrains(TaskRun task, Task taskDef){
protected Task constrain(TaskRun task, Task taskDef){
if( config.jobOpts().constraintSpec ){
def constraint = new Constraint()
if(config.jobOpts().constraintSpec.attribute){
Expand All @@ -251,8 +254,84 @@ class NomadService implements Closeable{
taskDef
}

protected Task constraints(TaskRun task, Task taskDef){
def constraints = [] as List<Constraint>

if( config.jobOpts().constraintsSpec ){
def list = constraintsSpecToList(config.jobOpts().constraintsSpec)
constraints.addAll(list)
}

if( task.processor?.config?.get(TaskDirectives.CONSTRAINTS) &&
task.processor?.config?.get(TaskDirectives.CONSTRAINTS) instanceof Closure) {
Closure closure = task.processor?.config?.get(TaskDirectives.CONSTRAINTS) as Closure
ConstraintsSpec constraintsSpec = ConstraintsSpec.parse(closure)
def list = constraintsSpecToList(constraintsSpec)
constraints.addAll(list)
}

if( constraints.size()) {
taskDef.constraints(constraints)
}
taskDef
}

protected List<Constraint> constraintsSpecToList(ConstraintsSpec spec){
def constraints = [] as List<Constraint>
if( spec?.nodeSpecs ){
def nodes = config.jobOpts()
.constraintsSpec
?.nodeSpecs
?.collect({ nodeConstraints(it)})
?.flatten() as List<Constraint>
constraints.addAll(nodes)
}
return constraints
}

protected List<Constraint> nodeConstraints(ConstraintNodeSpec nodeSpec){
def ret = [] as List<Constraint>
if( nodeSpec.id ){
ret.add new Constraint()
.ltarget('${node.unique.id}')
.operand("=")
.rtarget(nodeSpec.id)
}
if( nodeSpec.name ){
ret.add new Constraint()
.ltarget('${node.unique.name}')
.operand("=")
.rtarget(nodeSpec.name)
}
if( nodeSpec.clientClass ){
ret.add new Constraint()
.ltarget('${node.class}')
.operand("=")
.rtarget(nodeSpec.clientClass)
}
if( nodeSpec.dataCenter ){
ret.add new Constraint()
.ltarget('${node.datacenter}')
.operand("=")
.rtarget(nodeSpec.dataCenter)
}
if( nodeSpec.region ){
ret.add new Constraint()
.ltarget('${node.region}')
.operand("=")
.rtarget(nodeSpec.region)
}
if( nodeSpec.pool ){
ret.add new Constraint()
.ltarget('${node.pool}')
.operand("=")
.rtarget(nodeSpec.pool)
}
ret
}

protected Job assignDatacenters(TaskRun task, Job job){
def datacenters = task.processor?.config?.get("datacenters")
def datacenters = task.processor?.config?.get(TaskDirectives.DATACENTERS)
if( datacenters ){
if( datacenters instanceof List<String>) {
job.datacenters( datacenters as List<String>)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ class TaskDirectives {

public static final String DATACENTERS = "datacenters"

public static final String CONSTRAINTS = "constraints"

public static final List<String> ALL = [
DATACENTERS
DATACENTERS,
CONSTRAINTS
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package nextflow.nomad
package nextflow.nomad.config

import nextflow.nomad.config.NomadConfig
import nextflow.nomad.config.VolumeSpec
Expand Down
Loading

0 comments on commit d63c50f

Please sign in to comment.