Skip to content

Commit

Permalink
Increase num retries for Workaround for de-duplication of cloud task
Browse files Browse the repository at this point in the history
  • Loading branch information
Tereius committed Jan 11, 2025
1 parent 06e7f3a commit 29fcb7d
Showing 1 changed file with 12 additions and 78 deletions.
90 changes: 12 additions & 78 deletions runner-autoscaler/pkg/srv.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,48 +409,6 @@ func (s *Autoscaler) readPat(ctx context.Context) (string, error) {
}
}

/*
func (s *Autoscaler) GenerateRunnerRegistrationToken(ctx context.Context) (string, error) {
log.Debugf("About to request GitHub runner registration token using PAT from secret version: %s", s.conf.SecretVersion)
secretAccessClient := newSecretAccessClient(ctx)
defer secretAccessClient.Close()
if pat, err := s.readPat(ctx); err != nil {
return "", err
} else {
if req, err := http.NewRequestWithContext(ctx, "POST", fmt.Sprintf(RUNNER_REGISTER_TOKEN_ORG_ENDPOINT, s.conf.GitHubOrg), nil); err != nil {
log.Errorf("Could not create GitHub runner registration token request")
return "", fmt.Errorf("failed registration token request")
} else {
req.Header.Add("Accept", "application/vnd.github+json")
req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", pat))
req.Header.Add("X-GitHub-Api-Version", GITHUB_API_VERSION)
req.Header.Add("User-Agent", "github-runner-autoscaler")
if resp, err := http.DefaultClient.Do(req); err != nil {
log.Errorf("GitHub runner registration token request failed: %s", err.Error())
return "", fmt.Errorf("failed registration token response")
} else if resp.StatusCode != 201 {
log.Errorf("GitHub runner registration token request unsuccessful: %s", resp.Status)
defer resp.Body.Close()
return "", fmt.Errorf("failed registration token response")
} else {
defer resp.Body.Close()
body, _ := io.ReadAll(resp.Body)
payload := map[string]string{}
if err := json.Unmarshal(body, &payload); err != nil {
log.Errorf("GitHub runner registration token response missing: %s", err.Error())
return "", fmt.Errorf("failed registration token response")
} else if token, ok := payload["token"]; ok && len(token) > 0 {
return token, nil
} else {
log.Errorf("GitHub runner registration token is empty")
return "", fmt.Errorf("failed registration token response")
}
}
}
}
}*/

// A jit-config needs: RunnerName, RunnerGroupId, Labels, WorkFolder
func (s *Autoscaler) GenerateRunnerJitConfig(ctx context.Context, url string, runnerName string, runnerGroupId int64, labels []string) (string, error) {

Expand Down Expand Up @@ -507,7 +465,6 @@ func (s *Autoscaler) CreateCallbackTaskWithToken(ctx context.Context, url string
req := &taskspb.CreateTaskRequest{
Parent: s.conf.TaskQueue,
Task: &taskspb.Task{
Name: fmt.Sprintf("%s/tasks/%d-0", s.conf.TaskQueue, job.Id),
DispatchDeadline: &durationpb.Duration{
Seconds: 120, // the timeout of the cloud task callback - must be greater the time it takes to start the VM
Nanos: 0,
Expand All @@ -524,29 +481,27 @@ func (s *Autoscaler) CreateCallbackTaskWithToken(ctx context.Context, url string
},
},
}

req.Task.GetHttpRequest().Body = []byte(data)

client := newTaskClient(ctx)
defer client.Close()
_, err := client.CreateTask(ctx, req)
if err != nil {
// parse error so we can workaround de-duplication
if match, _ := regexp.MatchString("code = AlreadyExists", err.Error()); match {
req.Task.Name = fmt.Sprintf("%s/tasks/%d-1", s.conf.TaskQueue, job.Id)
_, err := client.CreateTask(ctx, req)
if err != nil {
return fmt.Errorf("cloudtasks.CreateTask finally failed for job Id %d: %v", job.Id, err)

var sendAndRetry func(int) error
sendAndRetry = func(retryCount int) error {
req.Task.Name = fmt.Sprintf("%s/tasks/%d-%d", s.conf.TaskQueue, job.Id, retryCount)
if _, err := client.CreateTask(ctx, req); err != nil {
if retry, _ := regexp.MatchString("code = AlreadyExists", err.Error()); retry && retryCount < 2 {
return sendAndRetry(retryCount + 1)
} else {
log.Infof("Finally created cloud task callback for workflow job Id %d with url \"%s\" and payload \"%s\"", job.Id, url, data)
return fmt.Errorf("cloudtasks.CreateTask failed for job Id %d: %v", job.Id, err)
}
} else {
return fmt.Errorf("cloudtasks.CreateTask failed for job Id %d: %v", job.Id, err)
log.Infof("Created cloud task callback for workflow job Id %d with url \"%s\" and payload \"%s\"", job.Id, url, data)
return nil
}
} else {
log.Infof("Created cloud task callback for workflow job Id %d with url \"%s\" and payload \"%s\"", job.Id, url, data)
}
return nil

return sendAndRetry(0)
}

func (s *Autoscaler) DeleteCallbackTask(ctx context.Context, job Job) error {
Expand All @@ -573,27 +528,6 @@ chmod +x ./runner_startup.sh
rm runner_startup.sh
`

/*
func (s *Autoscaler) createVmWithRegistrationToken(ctx *gin.Context, instanceName string) {
if token, err := s.GenerateRunnerRegistrationToken(ctx); err != nil {
ctx.AbortWithError(http.StatusInternalServerError, err)
} else {
registration_token_attr := fmt.Sprintf("%s_%s", RUNNER_REGISTRATION_TOKEN_ATTR, RandStringRunes(16))
if err := s.CreateInstanceFromTemplate(ctx, instanceName, &computepb.Items{
Key: proto.String(registration_token_attr),
Value: proto.String(token),
}, &computepb.Items{
Key: proto.String("startup-script"),
Value: proto.String(fmt.Sprintf(runner_script_wrapper, registration_token_attr, RUNNER_SCRIPT_REGISTER_RUNNER_ATTR)),
}); err != nil {
ctx.AbortWithError(http.StatusInternalServerError, err)
} else {
ctx.Status(http.StatusOK)
}
}
}*/

func (s *Autoscaler) createVmWithJitConfig(ctx *gin.Context, url string, runnerGroupId int64, settings VmSettings, labels []string) {

if jitConfig, err := s.GenerateRunnerJitConfig(ctx, url, settings.Name, runnerGroupId, labels); err != nil {
Expand Down

0 comments on commit 29fcb7d

Please sign in to comment.