Skip to content

Commit

Permalink
Day 88/100 commits - add event handler for IaC Apply scheduled
Browse files Browse the repository at this point in the history
  • Loading branch information
PawelHaracz committed May 27, 2024
1 parent 14abd80 commit e4714e2
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 7 deletions.
4 changes: 2 additions & 2 deletions internal/eventbus/events/apply_iac.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ import (

const IAC_APPLY_SCHEDULED EventName = "iac_apply_scheduled"

type IacApplied struct {
type IacApplyScheduled struct {
ChangeId uuid.UUID
ProjectId uuid.UUID
PlanId uuid.UUID
IacType aggregates.IaCPlanType
Owner string
}

func (i IacApplied) MarshalBinary() ([]byte, error) {
func (i IacApplyScheduled) MarshalBinary() ([]byte, error) {
return json.Marshal(i)
}
2 changes: 1 addition & 1 deletion internal/eventbus/events/scheduled_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"time"
)

const SCHEDULED_PLAN = "scheduled_plan"
const SCHEDULED_PLAN EventName = "scheduled_plan"

type ScheduledPlan struct {
ProjectId uuid.UUID
Expand Down
5 changes: 4 additions & 1 deletion internal/handlers/event_handler_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func NewEventHandlerFactory(eventSubscriber eb.EventSubscriber, eventPublisher e
eventSubscriber: eventSubscriber,
unitOfWork: unitOfWork,
eventPublisher: eventPublisher,
allowedEvents: []events.EventName{events.LEASE_LOCK, events.TRIGGERED_PLAN, events.SCHEDULED_PLAN},
allowedEvents: []events.EventName{events.LEASE_LOCK, events.TRIGGERED_PLAN, events.SCHEDULED_PLAN, events.IAC_APPLY_SCHEDULED},
}
}

Expand All @@ -43,6 +43,9 @@ func (factory *EventHandlerFactory) RegisterHandler(event events.EventName) (Eve
case events.SCHEDULED_PLAN:
factory.allowedEvents = helpers.Remove(factory.allowedEvents, event)
return newScheduledPlanHandler(factory.eventSubscriber, factory.unitOfWork, factory.eventPublisher)
case events.IAC_APPLY_SCHEDULED:
factory.allowedEvents = helpers.Remove(factory.allowedEvents, event)
return newScheduledIaCApplyHandler(factory.eventSubscriber, factory.unitOfWork)
}
return nil, MissingHandlerImplementedFactory
}
Expand Down
54 changes: 54 additions & 0 deletions internal/handlers/scheduled_iac_apply_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package handlers

import (
"encoding/json"
"fmt"
"golang.org/x/net/context"
eb "labraboard/internal/eventbus"
"labraboard/internal/eventbus/events"
"labraboard/internal/logger"
"labraboard/internal/repositories"
)

type scheduledIaCApplyHandler struct {
eventSubscriber eb.EventSubscriber
unitOfWork *repositories.UnitOfWork
}

func newScheduledIaCApplyHandler(eventSubscriber eb.EventSubscriber, unitOfWork *repositories.UnitOfWork) (*scheduledIaCApplyHandler, error) {
return &scheduledIaCApplyHandler{
eventSubscriber,
unitOfWork,
}, nil
}

func (handler *scheduledIaCApplyHandler) Handle(ctx context.Context) {
log := logger.GetWitContext(ctx).With().Str("event", string(events.IAC_APPLY_SCHEDULED)).Logger()
locks := handler.eventSubscriber.Subscribe(events.SCHEDULED_PLAN, log.WithContext(ctx))
for msg := range locks {
var event = events.IacApplyScheduled{}
err := json.Unmarshal(msg, &event)
if err != nil {
log.Error().Err(fmt.Errorf("cannot handle message type %T", event))
}
log.Info().Msgf("Received message: %s", msg)
go handler.handle(event, log.WithContext(ctx))
}
}

func (handler *scheduledIaCApplyHandler) handle(event events.IacApplyScheduled, ctx context.Context) {
log := logger.GetWitContext(ctx).
With().
Str("changeId", event.ChangeId.String()).
Str("iacType", string(event.IacType)).
Str("planId", event.PlanId.String()).
Str("projectId", event.ProjectId.String()).
Str("owner", event.Owner).
Logger()
_, err := handler.unitOfWork.IacPlan.Get(event.PlanId, log.WithContext(ctx))
if err != nil {
log.Error().Err(err)
return
}

}
5 changes: 3 additions & 2 deletions internal/handlers/scheduled_plan_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ func newScheduledPlanHandler(eventSubscriber eb.EventSubscriber, unitOfWork *rep
publisher,
}, nil
}

func (handler *scheduledPlanHandler) Handle(ctx context.Context) {
log := logger.GetWitContext(ctx).With().Str("event", events.SCHEDULED_PLAN).Logger()
locks := handler.eventSubscriber.Subscribe(events.SCHEDULED_PLAN, ctx)
log := logger.GetWitContext(ctx).With().Str("event", string(events.SCHEDULED_PLAN)).Logger()
locks := handler.eventSubscriber.Subscribe(events.SCHEDULED_PLAN, log.WithContext(ctx))
for msg := range locks {
var event = events.ScheduledPlan{}
err := json.Unmarshal(msg, &event)
Expand Down
2 changes: 1 addition & 1 deletion internal/services/iacService.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ func (svc *IacService) ScheduleApply(projectId uuid.UUID, planId uuid.UUID, ctx
}

var changeId = uuid.New()
var event = &events.IacApplied{
var event = &events.IacApplyScheduled{
ChangeId: changeId,
ProjectId: projectId,
PlanId: planId,
Expand Down

0 comments on commit e4714e2

Please sign in to comment.