From e4714e215a54e49ee5a2f8f9077aa559ac612c1b Mon Sep 17 00:00:00 2001 From: pawelharacz Date: Mon, 27 May 2024 22:00:22 +0200 Subject: [PATCH] Day 88/100 commits - add event handler for IaC Apply scheduled --- internal/eventbus/events/apply_iac.go | 4 +- internal/eventbus/events/scheduled_plan.go | 2 +- internal/handlers/event_handler_factory.go | 5 +- .../handlers/scheduled_iac_apply_handler.go | 54 +++++++++++++++++++ internal/handlers/scheduled_plan_handler.go | 5 +- internal/services/iacService.go | 2 +- 6 files changed, 65 insertions(+), 7 deletions(-) create mode 100644 internal/handlers/scheduled_iac_apply_handler.go diff --git a/internal/eventbus/events/apply_iac.go b/internal/eventbus/events/apply_iac.go index aebfa7d..f5686c6 100644 --- a/internal/eventbus/events/apply_iac.go +++ b/internal/eventbus/events/apply_iac.go @@ -8,7 +8,7 @@ import ( const IAC_APPLY_SCHEDULED EventName = "iac_apply_scheduled" -type IacApplied struct { +type IacApplyScheduled struct { ChangeId uuid.UUID ProjectId uuid.UUID PlanId uuid.UUID @@ -16,6 +16,6 @@ type IacApplied struct { Owner string } -func (i IacApplied) MarshalBinary() ([]byte, error) { +func (i IacApplyScheduled) MarshalBinary() ([]byte, error) { return json.Marshal(i) } diff --git a/internal/eventbus/events/scheduled_plan.go b/internal/eventbus/events/scheduled_plan.go index 276c085..39bcc8e 100644 --- a/internal/eventbus/events/scheduled_plan.go +++ b/internal/eventbus/events/scheduled_plan.go @@ -6,7 +6,7 @@ import ( "time" ) -const SCHEDULED_PLAN = "scheduled_plan" +const SCHEDULED_PLAN EventName = "scheduled_plan" type ScheduledPlan struct { ProjectId uuid.UUID diff --git a/internal/handlers/event_handler_factory.go b/internal/handlers/event_handler_factory.go index b6daa43..6374cbc 100644 --- a/internal/handlers/event_handler_factory.go +++ b/internal/handlers/event_handler_factory.go @@ -25,7 +25,7 @@ func NewEventHandlerFactory(eventSubscriber eb.EventSubscriber, eventPublisher e eventSubscriber: eventSubscriber, unitOfWork: unitOfWork, eventPublisher: eventPublisher, - allowedEvents: []events.EventName{events.LEASE_LOCK, events.TRIGGERED_PLAN, events.SCHEDULED_PLAN}, + allowedEvents: []events.EventName{events.LEASE_LOCK, events.TRIGGERED_PLAN, events.SCHEDULED_PLAN, events.IAC_APPLY_SCHEDULED}, } } @@ -43,6 +43,9 @@ func (factory *EventHandlerFactory) RegisterHandler(event events.EventName) (Eve case events.SCHEDULED_PLAN: factory.allowedEvents = helpers.Remove(factory.allowedEvents, event) return newScheduledPlanHandler(factory.eventSubscriber, factory.unitOfWork, factory.eventPublisher) + case events.IAC_APPLY_SCHEDULED: + factory.allowedEvents = helpers.Remove(factory.allowedEvents, event) + return newScheduledIaCApplyHandler(factory.eventSubscriber, factory.unitOfWork) } return nil, MissingHandlerImplementedFactory } diff --git a/internal/handlers/scheduled_iac_apply_handler.go b/internal/handlers/scheduled_iac_apply_handler.go new file mode 100644 index 0000000..639ce93 --- /dev/null +++ b/internal/handlers/scheduled_iac_apply_handler.go @@ -0,0 +1,54 @@ +package handlers + +import ( + "encoding/json" + "fmt" + "golang.org/x/net/context" + eb "labraboard/internal/eventbus" + "labraboard/internal/eventbus/events" + "labraboard/internal/logger" + "labraboard/internal/repositories" +) + +type scheduledIaCApplyHandler struct { + eventSubscriber eb.EventSubscriber + unitOfWork *repositories.UnitOfWork +} + +func newScheduledIaCApplyHandler(eventSubscriber eb.EventSubscriber, unitOfWork *repositories.UnitOfWork) (*scheduledIaCApplyHandler, error) { + return &scheduledIaCApplyHandler{ + eventSubscriber, + unitOfWork, + }, nil +} + +func (handler *scheduledIaCApplyHandler) Handle(ctx context.Context) { + log := logger.GetWitContext(ctx).With().Str("event", string(events.IAC_APPLY_SCHEDULED)).Logger() + locks := handler.eventSubscriber.Subscribe(events.SCHEDULED_PLAN, log.WithContext(ctx)) + for msg := range locks { + var event = events.IacApplyScheduled{} + err := json.Unmarshal(msg, &event) + if err != nil { + log.Error().Err(fmt.Errorf("cannot handle message type %T", event)) + } + log.Info().Msgf("Received message: %s", msg) + go handler.handle(event, log.WithContext(ctx)) + } +} + +func (handler *scheduledIaCApplyHandler) handle(event events.IacApplyScheduled, ctx context.Context) { + log := logger.GetWitContext(ctx). + With(). + Str("changeId", event.ChangeId.String()). + Str("iacType", string(event.IacType)). + Str("planId", event.PlanId.String()). + Str("projectId", event.ProjectId.String()). + Str("owner", event.Owner). + Logger() + _, err := handler.unitOfWork.IacPlan.Get(event.PlanId, log.WithContext(ctx)) + if err != nil { + log.Error().Err(err) + return + } + +} diff --git a/internal/handlers/scheduled_plan_handler.go b/internal/handlers/scheduled_plan_handler.go index fa2f4a7..d38e8fd 100644 --- a/internal/handlers/scheduled_plan_handler.go +++ b/internal/handlers/scheduled_plan_handler.go @@ -24,9 +24,10 @@ func newScheduledPlanHandler(eventSubscriber eb.EventSubscriber, unitOfWork *rep publisher, }, nil } + func (handler *scheduledPlanHandler) Handle(ctx context.Context) { - log := logger.GetWitContext(ctx).With().Str("event", events.SCHEDULED_PLAN).Logger() - locks := handler.eventSubscriber.Subscribe(events.SCHEDULED_PLAN, ctx) + log := logger.GetWitContext(ctx).With().Str("event", string(events.SCHEDULED_PLAN)).Logger() + locks := handler.eventSubscriber.Subscribe(events.SCHEDULED_PLAN, log.WithContext(ctx)) for msg := range locks { var event = events.ScheduledPlan{} err := json.Unmarshal(msg, &event) diff --git a/internal/services/iacService.go b/internal/services/iacService.go index 482c8cb..31d7943 100644 --- a/internal/services/iacService.go +++ b/internal/services/iacService.go @@ -283,7 +283,7 @@ func (svc *IacService) ScheduleApply(projectId uuid.UUID, planId uuid.UUID, ctx } var changeId = uuid.New() - var event = &events.IacApplied{ + var event = &events.IacApplyScheduled{ ChangeId: changeId, ProjectId: projectId, PlanId: planId,