Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement an exception handler for api client #72

Merged
merged 2 commits into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,19 @@ class NomadClientOpts{

final static protected API_VERSION = "v1"

private Map<String,String> sysEnv

final String address
final String token
final int connectionTimeout

final int readTimeout
final int writeTimeout

final RetryConfig retryConfig

NomadClientOpts(Map nomadClientOpts, Map<String,String> env=null){
assert nomadClientOpts!=null

sysEnv = env==null ? new HashMap<String,String>(System.getenv()) : env
def sysEnv = env ?: new HashMap<String,String>(System.getenv())

def address = (nomadClientOpts.address?.toString() ?: sysEnv.get('NOMAD_ADDR'))
assert address != null, "Nomad Address is required"
Expand All @@ -50,8 +54,17 @@ class NomadClientOpts{
address +="/"
this.address = address + API_VERSION
this.token = nomadClientOpts.token ?: sysEnv.get('NOMAD_TOKEN')
this.connectionTimeout = (nomadClientOpts.connectionTimeout ?: 6000 ) as Integer
this.readTimeout = (nomadClientOpts.readTimeout ?: 6000 ) as Integer
this.writeTimeout = (nomadClientOpts.writeTimeout ?: 6000 ) as Integer

this.retryConfig = new RetryConfig(nomadClientOpts.retryConfig as Map ?: Collections.emptyMap())

//TODO: Add mTLS properties and env vars
// https://developer.hashicorp.com/nomad/docs/commands#mtls-environment-variables
}

RetryConfig getRetryConfig() {
return retryConfig
}
}
29 changes: 29 additions & 0 deletions plugins/nf-nomad/src/main/nextflow/nomad/config/RetryConfig.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package nextflow.nomad.config

import groovy.transform.CompileStatic
import nextflow.util.Duration


@CompileStatic
class RetryConfig {

Duration delay = Duration.of('250ms')
Duration maxDelay = Duration.of('90s')
int maxAttempts = 10
double jitter = 0.25

RetryConfig(){
this(Collections.emptyMap())
}

RetryConfig(Map config){
if( config.delay )
delay = config.delay as Duration
if( config.maxDelay )
maxDelay = config.maxDelay as Duration
if( config.maxAttempts )
maxAttempts = config.maxAttempts as int
if( config.jitter )
jitter = config.jitter as double
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package nextflow.nomad.executor

import dev.failsafe.Failsafe
import dev.failsafe.RetryPolicy
import dev.failsafe.event.EventListener
import dev.failsafe.event.ExecutionAttemptedEvent
import dev.failsafe.function.CheckedSupplier
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import io.nomadproject.client.ApiException
import nextflow.nomad.config.RetryConfig

import java.time.temporal.ChronoUnit
import java.util.concurrent.TimeoutException
import java.util.function.Predicate

@Slf4j
@CompileStatic
class FailsafeExecutor {

private RetryConfig config

FailsafeExecutor(RetryConfig config){
this.config = config
}

protected <T> RetryPolicy<T> retryPolicy(Predicate<? extends Throwable> cond) {

final listener = new EventListener<ExecutionAttemptedEvent<T>>() {
@Override
void accept(ExecutionAttemptedEvent<T> event) throws Throwable {
log.debug("Nomad TooManyRequests response error - attempt: ${event.attemptCount}; reason: ${event.lastFailure.message}")
}
}
return RetryPolicy.<T>builder()
.handleIf(cond)
.withBackoff(config.delay.toMillis(), config.maxDelay.toMillis(), ChronoUnit.MILLIS)
.withMaxAttempts(config.maxAttempts)
.withJitter(config.jitter)
.onRetry(listener)
.build()
}

final private static List<Integer> RETRY_CODES = List.of(408, 429, 500, 502, 503, 504)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jagedn, is there a specific dictionary/reference you used to focus only on these exit codes?

Maybe a good idea to add that in the comments or just explain the through process behind these exit codes.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hahaha c&p from the azure plugin

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, in that case these must be specific to the Azure Batch API 🤔

@tomiles @matthdsm @jhaezebr , are you aware of any Nomad specific error codes or does a Nomad client/server just propogate the task-and-OS level error code?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't think so, they are common http error codes

I've added descriptions for them

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mmm, I was under the impression that nomad would have it's own set of error codes as well but judging from this issue hashicorp/nomad#17782 I think that for client/task errors it stores them in exit code

Could be related to #77 🤔

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I agree but maybe better handle the ext code in the #77 and let this PR handle the infra/http errors

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good - then let's merge and make an edge release :)


protected <T> T apply(CheckedSupplier<T> action) {
// define the retry condition
final cond = new Predicate<? extends Throwable>() {
@Override
boolean test(Throwable t) {
if( t instanceof ApiException && t.code in RETRY_CODES )
return true
if( t instanceof IOException || t.cause instanceof IOException )
return true
if( t instanceof TimeoutException || t.cause instanceof TimeoutException )
return true
return false
}
}
// create the retry policy object
final policy = retryPolicy(cond)
// apply the action with
return Failsafe.with(policy).get(action)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,14 @@ class NomadService implements Closeable{
ApiClient apiClient
JobsApi jobsApi
VariablesApi variablesApi
FailsafeExecutor safeExecutor

NomadService(NomadConfig config) {
this.config = config

//TODO: Accommodate these connection level options in clientOpts()
final CONNECTION_TIMEOUT_MILLISECONDS = 60000
final READ_TIMEOUT_MILLISECONDS = 60000
final WRITE_TIMEOUT_MILLISECONDS = 60000
final CONNECTION_TIMEOUT_MILLISECONDS = config.clientOpts().connectionTimeout
final READ_TIMEOUT_MILLISECONDS = config.clientOpts().readTimeout
final WRITE_TIMEOUT_MILLISECONDS = config.clientOpts().writeTimeout

apiClient = new ApiClient( connectTimeout: CONNECTION_TIMEOUT_MILLISECONDS, readTimeout: READ_TIMEOUT_MILLISECONDS, writeTimeout: WRITE_TIMEOUT_MILLISECONDS)
apiClient.basePath = config.clientOpts().address
Expand All @@ -64,6 +64,8 @@ class NomadService implements Closeable{
}
this.jobsApi = new JobsApi(apiClient)
this.variablesApi = new VariablesApi(apiClient)

this.safeExecutor = new FailsafeExecutor(config.clientOpts().retryConfig)
}


Expand Down Expand Up @@ -96,8 +98,12 @@ class NomadService implements Closeable{
}

try {
JobRegisterResponse jobRegisterResponse = jobsApi.registerJob(jobRegisterRequest, config.jobOpts().region, config.jobOpts().namespace, null, null)
return jobRegisterResponse.evalID
safeExecutor.apply {
JobRegisterResponse jobRegisterResponse = jobsApi.registerJob(jobRegisterRequest,
config.jobOpts().region, config.jobOpts().namespace,
null, null)
jobRegisterResponse.evalID
}
} catch (ApiException apiException) {
log.debug("[NOMAD] Failed to submit ${job.name} -- Cause: ${apiException.responseBody ?: apiException}", apiException)
throw new ProcessSubmitException("[NOMAD] Failed to submit ${job.name} -- Cause: ${apiException.responseBody ?: apiException}", apiException)
Expand All @@ -110,7 +116,11 @@ class NomadService implements Closeable{

String getJobState(String jobId){
try {
List<AllocationListStub> allocations = jobsApi.getJobAllocations(jobId, config.jobOpts().region, config.jobOpts().namespace, null, null, null, null, null, null, null, null)
List<AllocationListStub> allocations = safeExecutor.apply {
jobsApi.getJobAllocations(jobId, config.jobOpts().region, config.jobOpts().namespace,
null, null, null, null, null, null,
null, null)
}
AllocationListStub last = allocations?.sort {
it.modifyIndex
}?.last()
Expand All @@ -127,7 +137,10 @@ class NomadService implements Closeable{

boolean checkIfRunning(String jobId){
try {
Job job = jobsApi.getJob(jobId, config.jobOpts().region, config.jobOpts().namespace, null, null, null, null, null, null, null)
Job job = safeExecutor.apply {
jobsApi.getJob(jobId, config.jobOpts().region, config.jobOpts().namespace,
null, null, null, null, null, null, null)
}
log.debug "[NOMAD] checkIfRunning jobID=$job.ID; status=$job.status"
job.status == "running"
}catch (Exception e){
Expand All @@ -138,7 +151,10 @@ class NomadService implements Closeable{

boolean checkIfDead(String jobId){
try{
Job job = jobsApi.getJob(jobId, config.jobOpts().region, config.jobOpts().namespace, null, null, null, null, null, null, null)
Job job = safeExecutor.apply {
jobsApi.getJob(jobId, config.jobOpts().region, config.jobOpts().namespace,
null, null, null, null, null, null, null)
}
log.debug "[NOMAD] checkIfDead jobID=$job.ID; status=$job.status"
job.status == "dead"
}catch (Exception e){
Expand All @@ -158,15 +174,22 @@ class NomadService implements Closeable{
protected void purgeJob(String jobId, boolean purge){
log.debug "[NOMAD] purgeJob with jobId=${jobId}"
try {
jobsApi.deleteJob(jobId, config.jobOpts().region, config.jobOpts().namespace, null, null, purge, true)
safeExecutor.apply {
jobsApi.deleteJob(jobId, config.jobOpts().region, config.jobOpts().namespace,
null, null, purge, true)
}
}catch(Exception e){
log.debug("[NOMAD] Failed to delete job ${jobId} -- Cause: ${e.message ?: e}", e)
}
}

String getClientOfJob(String jobId) {
try{
List<AllocationListStub> allocations = jobsApi.getJobAllocations(jobId, config.jobOpts().region, config.jobOpts().namespace, null, null, null, null, null, null, null, null)
List<AllocationListStub> allocations = safeExecutor.apply {
jobsApi.getJobAllocations(jobId, config.jobOpts().region, config.jobOpts().namespace,
null, null, null, null, null, null,
null, null)
}
if( !allocations ){
return null
}
Expand All @@ -183,10 +206,12 @@ class NomadService implements Closeable{
}

String getVariableValue(String path, String key){
var variable = variablesApi.getVariableQuery("$path/$key",
config.jobOpts().region,
config.jobOpts().namespace,
null, null, null, null, null, null, null)
var variable = safeExecutor.apply {
variablesApi.getVariableQuery("$path/$key",
config.jobOpts().region,
config.jobOpts().namespace,
null, null, null, null, null, null, null)
}
variable?.items?.find{ it.key == key }?.value
}

Expand All @@ -197,17 +222,22 @@ class NomadService implements Closeable{
void setVariableValue(String path, String key, String value){
var content = Map.of(key,value)
var variable = new Variable(path: path, items: content)
variablesApi.postVariable("$path/$key", variable,
config.jobOpts().region,
config.jobOpts().namespace,
null, null, null)
safeExecutor.apply {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really like this clean design @jagedn 🤩

variablesApi.postVariable("$path/$key", variable,
config.jobOpts().region,
config.jobOpts().namespace,
null, null, null)
}
}

List<String> getVariablesList(){
var listRequest = variablesApi.getVariablesListRequest(
config.jobOpts().region,
config.jobOpts().namespace,
null, null, null, null, null, null, null)
var listRequest = safeExecutor.apply {
variablesApi.getVariablesListRequest(
config.jobOpts().region,
config.jobOpts().namespace,
null, null, null, null,
null, null, null)
}
String path = (config.jobOpts().secretOpts?.path ?: '')+"/"
listRequest.collect{ it.path - path}
}
Expand All @@ -218,9 +248,11 @@ class NomadService implements Closeable{

void deleteVariable(String path, String key){
var variable = new Variable( items: Map.of(key, ""))
variablesApi.deleteVariable("$path/$key", variable,
config.jobOpts().region,
config.jobOpts().namespace,
null, null, null)
safeExecutor.apply {
variablesApi.deleteVariable("$path/$key", variable,
config.jobOpts().region,
config.jobOpts().namespace,
null, null, null)
}
}
}
6 changes: 5 additions & 1 deletion plugins/nf-nomad/src/test/nextflow/nomad/NomadDSLSpec.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,11 @@ class NomadDSLSpec extends Dsl2Spec{
[
client:
[
address : "http://${mockWebServer.hostName}:${mockWebServer.port}"
address : "http://${mockWebServer.hostName}:${mockWebServer.port}",
retryConfig:[
maxAttempts: 1,
delay: '1ms'
]
]
]
]).setScript(SCRIPT).execute()
Expand Down
Loading