Skip to content

Commit

Permalink
Workaround for de-duplication of cloud task
Browse files Browse the repository at this point in the history
  • Loading branch information
Tereius committed Jan 10, 2025
1 parent 42dc9a1 commit 06e7f3a
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 25 deletions.
12 changes: 6 additions & 6 deletions iam.tf
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ resource "google_project_iam_custom_role" "manage_vm_instances" {
permissions = ["compute.instances.get", "compute.instances.start", "compute.instances.stop", "compute.instances.delete", "compute.instances.create", "compute.instances.setMetadata", "compute.instances.setTags", "compute.instances.setServiceAccount"]
}

resource "google_project_iam_custom_role" "create_cloud_task" {
role_id = "CreateCloudTask"
title = "Create a Cloud Task"
permissions = ["cloudtasks.tasks.create"]
resource "google_project_iam_custom_role" "create_delete_cloud_task" {
role_id = "CreateDeleteCloudTask"
title = "Create/Delete a Cloud Task"
permissions = ["cloudtasks.tasks.create", "cloudtasks.tasks.delete"]
}

resource "google_project_iam_custom_role" "create_vm_from_instance_template" {
Expand Down Expand Up @@ -71,10 +71,10 @@ resource "google_project_iam_member" "manage_vm_instances_member" {
}
}

resource "google_project_iam_member" "create_cloud_task_member" {
resource "google_project_iam_member" "create_delete_cloud_task_member" {
project = local.projectId
member = "serviceAccount:${google_service_account.autoscaler_sa.email}"
role = google_project_iam_custom_role.create_cloud_task.id
role = google_project_iam_custom_role.create_delete_cloud_task.id

# DOES NOT WORK
#condition {
Expand Down
29 changes: 20 additions & 9 deletions runner-autoscaler/pkg/srv.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,15 +499,15 @@ func (s *Autoscaler) GenerateRunnerJitConfig(ctx context.Context, url string, ru
}
}

func (s *Autoscaler) createCallbackTaskWithToken(ctx context.Context, url string, secret string, job Job, delay time.Duration) error {
func (s *Autoscaler) CreateCallbackTaskWithToken(ctx context.Context, url string, secret string, job Job, delay time.Duration) error {

data, _ := json.Marshal(job)
now := timestamppb.Now()
now.Seconds += int64(delay.Seconds())
req := &taskspb.CreateTaskRequest{
Parent: s.conf.TaskQueue,
Task: &taskspb.Task{
Name: fmt.Sprintf("%s/tasks/%d", s.conf.TaskQueue, job.Id),
Name: fmt.Sprintf("%s/tasks/%d-0", s.conf.TaskQueue, job.Id),
DispatchDeadline: &durationpb.Duration{
Seconds: 120, // the timeout of the cloud task callback - must be greater the time it takes to start the VM
Nanos: 0,
Expand All @@ -531,19 +531,30 @@ func (s *Autoscaler) createCallbackTaskWithToken(ctx context.Context, url string
defer client.Close()
_, err := client.CreateTask(ctx, req)
if err != nil {
return fmt.Errorf("cloudtasks.CreateTask failed for job Id %d: %v", job.Id, err)
// parse error so we can workaround de-duplication
if match, _ := regexp.MatchString("code = AlreadyExists", err.Error()); match {
req.Task.Name = fmt.Sprintf("%s/tasks/%d-1", s.conf.TaskQueue, job.Id)
_, err := client.CreateTask(ctx, req)
if err != nil {
return fmt.Errorf("cloudtasks.CreateTask finally failed for job Id %d: %v", job.Id, err)
} else {
log.Infof("Finally created cloud task callback for workflow job Id %d with url \"%s\" and payload \"%s\"", job.Id, url, data)
}
} else {
return fmt.Errorf("cloudtasks.CreateTask failed for job Id %d: %v", job.Id, err)
}
} else {
log.Infof("Created cloud task callback for workflow job Id %d with url \"%s\" and payload \"%s\"", job.Id, url, data)
}
return nil
}

func (s *Autoscaler) deleteCallbackTask(ctx context.Context, job Job) error {
func (s *Autoscaler) DeleteCallbackTask(ctx context.Context, job Job) error {

client := newTaskClient(ctx)
defer client.Close()
err := client.DeleteTask(ctx, &taskspb.DeleteTaskRequest{
Name: fmt.Sprintf("%s/tasks/%d", s.conf.TaskQueue, job.Id),
Name: fmt.Sprintf("%s/tasks/%d-0", s.conf.TaskQueue, job.Id),
})
if err != nil {
return fmt.Errorf("cloudtasks.DeleteTask failed for job Id %d: %v", job.Id, err)
Expand Down Expand Up @@ -670,7 +681,7 @@ func (s *Autoscaler) handleWebhook(ctx *gin.Context) {
if ok, missingLabels := payload.Job.HasAllLabels(s.conf.RunnerLabels); ok {
createUrl := createCallbackUrl(ctx, s.conf.RouteCreateVm, s.conf.SourceQueryParam, src.Name)
// delay the create vm callback so we have a chance to delete it if the workflow job is changing its state to 'waiting'
if err := s.createCallbackTaskWithToken(ctx, createUrl, src.Secret, payload.Job, time.Duration(s.conf.CreateVmDelay)*time.Second); err != nil {
if err := s.CreateCallbackTaskWithToken(ctx, createUrl, src.Secret, payload.Job, time.Duration(s.conf.CreateVmDelay)*time.Second); err != nil {
log.Errorf("Can not enqueue create-vm cloud task callback: %s", err.Error())
ctx.AbortWithError(http.StatusInternalServerError, err)
return
Expand All @@ -681,7 +692,7 @@ func (s *Autoscaler) handleWebhook(ctx *gin.Context) {
} else if payload.Action == WAITING {
// the waiting action happens if a deployment environment is configured in the workflow that requires a review. We have to cancel the cloud task callback
if ok, missingLabels := payload.Job.HasAllLabels(s.conf.RunnerLabels); ok {
if err := s.deleteCallbackTask(ctx, payload.Job); err != nil {
if err := s.DeleteCallbackTask(ctx, payload.Job); err != nil {
// best effort - this is not considered an error
log.Warnf("Can not delete create-vm cloud task callback: %s", err.Error())
}
Expand All @@ -697,10 +708,10 @@ func (s *Autoscaler) handleWebhook(ctx *gin.Context) {
if ok, missingLabels := payload.Job.HasAllLabels(s.conf.RunnerLabels); ok {

// if the user immediately cancels a workflow we have the chance to delete the callback if not older than 10 seconds - best effort, ignore all errors
s.deleteCallbackTask(ctx, payload.Job)
s.DeleteCallbackTask(ctx, payload.Job)

deleteUrl := createCallbackUrl(ctx, s.conf.RouteDeleteVm, s.conf.SourceQueryParam, src.Name)
if err := s.createCallbackTaskWithToken(ctx, deleteUrl, src.Secret, payload.Job, 1*time.Second); err != nil {
if err := s.CreateCallbackTaskWithToken(ctx, deleteUrl, src.Secret, payload.Job, 1*time.Second); err != nil {
log.Errorf("Can not enqueue delete-vm cloud task callback: %s", err.Error())
ctx.AbortWithError(http.StatusInternalServerError, err)
return
Expand Down
24 changes: 14 additions & 10 deletions runner-autoscaler/test/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package test
import (
"context"
"fmt"
"math"
"math/rand"
"net/http"
"net/url"
"strings"
Expand Down Expand Up @@ -64,16 +66,6 @@ func TestWebhookSignature(t *testing.T) {
assert.Equal(t, 200, resp.StatusCode)
}

/*
func TestGenerateRunnerRegistrationToken(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
token, err := scaler.GenerateRunnerRegistrationToken(ctx)
assert.Nil(t, err)
assert.NotEmpty(t, token)
}*/

func TestGenerateRunnerJitConfig(t *testing.T) {

ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
Expand All @@ -93,6 +85,18 @@ func TestGetMagicLabelValue(t *testing.T) {
assert.Equal(t, "test", *result)
}

func TestCreateCallbackTask(t *testing.T) {

job := pkg.Job{
Id: rand.Int63n(math.MaxInt64),
Labels: []string{"test", "@foo:bar", "@machine:test"},
}
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
err := scaler.DeleteCallbackTask(ctx, job)
assert.Nil(t, err)
}

func TestHasAllLabels(t *testing.T) {

job := pkg.Job{
Expand Down

0 comments on commit 06e7f3a

Please sign in to comment.