From 06e7f3abb807e0ad542b331ac79c0796915e3145 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Stresing?= Date: Sat, 11 Jan 2025 00:35:37 +0100 Subject: [PATCH] Workaround for de-duplication of cloud task --- iam.tf | 12 ++++++------ runner-autoscaler/pkg/srv.go | 29 ++++++++++++++++++++--------- runner-autoscaler/test/main_test.go | 24 ++++++++++++++---------- 3 files changed, 40 insertions(+), 25 deletions(-) diff --git a/iam.tf b/iam.tf index 34b6750..61d4405 100644 --- a/iam.tf +++ b/iam.tf @@ -30,10 +30,10 @@ resource "google_project_iam_custom_role" "manage_vm_instances" { permissions = ["compute.instances.get", "compute.instances.start", "compute.instances.stop", "compute.instances.delete", "compute.instances.create", "compute.instances.setMetadata", "compute.instances.setTags", "compute.instances.setServiceAccount"] } -resource "google_project_iam_custom_role" "create_cloud_task" { - role_id = "CreateCloudTask" - title = "Create a Cloud Task" - permissions = ["cloudtasks.tasks.create"] +resource "google_project_iam_custom_role" "create_delete_cloud_task" { + role_id = "CreateDeleteCloudTask" + title = "Create/Delete a Cloud Task" + permissions = ["cloudtasks.tasks.create", "cloudtasks.tasks.delete"] } resource "google_project_iam_custom_role" "create_vm_from_instance_template" { @@ -71,10 +71,10 @@ resource "google_project_iam_member" "manage_vm_instances_member" { } } -resource "google_project_iam_member" "create_cloud_task_member" { +resource "google_project_iam_member" "create_delete_cloud_task_member" { project = local.projectId member = "serviceAccount:${google_service_account.autoscaler_sa.email}" - role = google_project_iam_custom_role.create_cloud_task.id + role = google_project_iam_custom_role.create_delete_cloud_task.id # DOES NOT WORK #condition { diff --git a/runner-autoscaler/pkg/srv.go b/runner-autoscaler/pkg/srv.go index d318a33..9a1fca2 100644 --- a/runner-autoscaler/pkg/srv.go +++ b/runner-autoscaler/pkg/srv.go @@ -499,7 +499,7 @@ func (s *Autoscaler) GenerateRunnerJitConfig(ctx context.Context, url string, ru } } -func (s *Autoscaler) createCallbackTaskWithToken(ctx context.Context, url string, secret string, job Job, delay time.Duration) error { +func (s *Autoscaler) CreateCallbackTaskWithToken(ctx context.Context, url string, secret string, job Job, delay time.Duration) error { data, _ := json.Marshal(job) now := timestamppb.Now() @@ -507,7 +507,7 @@ func (s *Autoscaler) createCallbackTaskWithToken(ctx context.Context, url string req := &taskspb.CreateTaskRequest{ Parent: s.conf.TaskQueue, Task: &taskspb.Task{ - Name: fmt.Sprintf("%s/tasks/%d", s.conf.TaskQueue, job.Id), + Name: fmt.Sprintf("%s/tasks/%d-0", s.conf.TaskQueue, job.Id), DispatchDeadline: &durationpb.Duration{ Seconds: 120, // the timeout of the cloud task callback - must be greater the time it takes to start the VM Nanos: 0, @@ -531,19 +531,30 @@ func (s *Autoscaler) createCallbackTaskWithToken(ctx context.Context, url string defer client.Close() _, err := client.CreateTask(ctx, req) if err != nil { - return fmt.Errorf("cloudtasks.CreateTask failed for job Id %d: %v", job.Id, err) + // parse error so we can workaround de-duplication + if match, _ := regexp.MatchString("code = AlreadyExists", err.Error()); match { + req.Task.Name = fmt.Sprintf("%s/tasks/%d-1", s.conf.TaskQueue, job.Id) + _, err := client.CreateTask(ctx, req) + if err != nil { + return fmt.Errorf("cloudtasks.CreateTask finally failed for job Id %d: %v", job.Id, err) + } else { + log.Infof("Finally created cloud task callback for workflow job Id %d with url \"%s\" and payload \"%s\"", job.Id, url, data) + } + } else { + return fmt.Errorf("cloudtasks.CreateTask failed for job Id %d: %v", job.Id, err) + } } else { log.Infof("Created cloud task callback for workflow job Id %d with url \"%s\" and payload \"%s\"", job.Id, url, data) } return nil } -func (s *Autoscaler) deleteCallbackTask(ctx context.Context, job Job) error { +func (s *Autoscaler) DeleteCallbackTask(ctx context.Context, job Job) error { client := newTaskClient(ctx) defer client.Close() err := client.DeleteTask(ctx, &taskspb.DeleteTaskRequest{ - Name: fmt.Sprintf("%s/tasks/%d", s.conf.TaskQueue, job.Id), + Name: fmt.Sprintf("%s/tasks/%d-0", s.conf.TaskQueue, job.Id), }) if err != nil { return fmt.Errorf("cloudtasks.DeleteTask failed for job Id %d: %v", job.Id, err) @@ -670,7 +681,7 @@ func (s *Autoscaler) handleWebhook(ctx *gin.Context) { if ok, missingLabels := payload.Job.HasAllLabels(s.conf.RunnerLabels); ok { createUrl := createCallbackUrl(ctx, s.conf.RouteCreateVm, s.conf.SourceQueryParam, src.Name) // delay the create vm callback so we have a chance to delete it if the workflow job is changing its state to 'waiting' - if err := s.createCallbackTaskWithToken(ctx, createUrl, src.Secret, payload.Job, time.Duration(s.conf.CreateVmDelay)*time.Second); err != nil { + if err := s.CreateCallbackTaskWithToken(ctx, createUrl, src.Secret, payload.Job, time.Duration(s.conf.CreateVmDelay)*time.Second); err != nil { log.Errorf("Can not enqueue create-vm cloud task callback: %s", err.Error()) ctx.AbortWithError(http.StatusInternalServerError, err) return @@ -681,7 +692,7 @@ func (s *Autoscaler) handleWebhook(ctx *gin.Context) { } else if payload.Action == WAITING { // the waiting action happens if a deployment environment is configured in the workflow that requires a review. We have to cancel the cloud task callback if ok, missingLabels := payload.Job.HasAllLabels(s.conf.RunnerLabels); ok { - if err := s.deleteCallbackTask(ctx, payload.Job); err != nil { + if err := s.DeleteCallbackTask(ctx, payload.Job); err != nil { // best effort - this is not considered an error log.Warnf("Can not delete create-vm cloud task callback: %s", err.Error()) } @@ -697,10 +708,10 @@ func (s *Autoscaler) handleWebhook(ctx *gin.Context) { if ok, missingLabels := payload.Job.HasAllLabels(s.conf.RunnerLabels); ok { // if the user immediately cancels a workflow we have the chance to delete the callback if not older than 10 seconds - best effort, ignore all errors - s.deleteCallbackTask(ctx, payload.Job) + s.DeleteCallbackTask(ctx, payload.Job) deleteUrl := createCallbackUrl(ctx, s.conf.RouteDeleteVm, s.conf.SourceQueryParam, src.Name) - if err := s.createCallbackTaskWithToken(ctx, deleteUrl, src.Secret, payload.Job, 1*time.Second); err != nil { + if err := s.CreateCallbackTaskWithToken(ctx, deleteUrl, src.Secret, payload.Job, 1*time.Second); err != nil { log.Errorf("Can not enqueue delete-vm cloud task callback: %s", err.Error()) ctx.AbortWithError(http.StatusInternalServerError, err) return diff --git a/runner-autoscaler/test/main_test.go b/runner-autoscaler/test/main_test.go index c5bcaac..0ca60c9 100644 --- a/runner-autoscaler/test/main_test.go +++ b/runner-autoscaler/test/main_test.go @@ -3,6 +3,8 @@ package test import ( "context" "fmt" + "math" + "math/rand" "net/http" "net/url" "strings" @@ -64,16 +66,6 @@ func TestWebhookSignature(t *testing.T) { assert.Equal(t, 200, resp.StatusCode) } -/* -func TestGenerateRunnerRegistrationToken(t *testing.T) { - - ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) - defer cancel() - token, err := scaler.GenerateRunnerRegistrationToken(ctx) - assert.Nil(t, err) - assert.NotEmpty(t, token) -}*/ - func TestGenerateRunnerJitConfig(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) @@ -93,6 +85,18 @@ func TestGetMagicLabelValue(t *testing.T) { assert.Equal(t, "test", *result) } +func TestCreateCallbackTask(t *testing.T) { + + job := pkg.Job{ + Id: rand.Int63n(math.MaxInt64), + Labels: []string{"test", "@foo:bar", "@machine:test"}, + } + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + err := scaler.DeleteCallbackTask(ctx, job) + assert.Nil(t, err) +} + func TestHasAllLabels(t *testing.T) { job := pkg.Job{