Skip to content

Commit

Permalink
Day 93/100 commits - Implement apply on the event handler and change …
Browse files Browse the repository at this point in the history
…method to read events, adjust plan to be compatible with assembler, add more logs
  • Loading branch information
PawelHaracz committed Jun 1, 2024
1 parent 69c5bf2 commit 7e7b575
Show file tree
Hide file tree
Showing 14 changed files with 256 additions and 83 deletions.
12 changes: 12 additions & 0 deletions .run/handlers.run.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="handlers" type="GoApplicationRunConfiguration" factoryName="Go Application">
<module name="labraboard" />
<working_directory value="$PROJECT_DIR$" />
<parameters value="--config $PROJECT_DIR$/config.yaml" />
<kind value="PACKAGE" />
<package value="labraboard/cmd/handlers" />
<directory value="$PROJECT_DIR$" />
<filePath value="$PROJECT_DIR$/cmd/handlers/main.go" />
<method v="2" />
</configuration>
</component>
12 changes: 12 additions & 0 deletions .run/monolith.run.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="monolith" type="GoApplicationRunConfiguration" factoryName="Go Application">
<module name="labraboard" />
<working_directory value="$PROJECT_DIR$" />
<parameters value="--config $PROJECT_DIR$/config.yaml" />
<kind value="PACKAGE" />
<package value="labraboard/cmd/monolith" />
<directory value="$PROJECT_DIR$" />
<filePath value="$PROJECT_DIR$/cmd/monolith/main.go" />
<method v="2" />
</configuration>
</component>
2 changes: 2 additions & 0 deletions internal/aggregates/iac_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,3 +118,5 @@ func (plan *IacPlan) Composite() (planJson []byte, planType IaCPlanType, changes
func (p *IacPlan) GetPlanType() string {
return string(p.planType)
}

func (p *IacPlan) GetPlanRaw() []byte { return p.planRaw }
23 changes: 15 additions & 8 deletions internal/eventbus/redisEventBus/eventbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,22 @@ func (r *EventBus) Subscribe(key events.EventName, ctx context.Context) chan []b
go func() {
defer close(item) //check it
for {
msg, err := subscriber.ReceiveMessage(ctx)
if err != nil {
// handle error, for example log it and return
log.Error().Err(err)
return
}
msg, err := subscriber.Receive(ctx)
switch v := msg.(type) {
case redis.Message:
if err != nil {
// handle error, for example log it and return
log.Error().Err(err)
return
}

item <- []byte(msg.Payload)
log.Info().Msgf("Received message from %s channel", msg.Channel)
item <- []byte(v.Payload)
log.Info().Msgf("Received message from %s channel", v.Channel)
case redis.Subscription:
log.Info().Msgf("%s: %s %d\n", v.Channel, v.Kind, v.Count)
case error:
log.Error().Err(v).Msg("cannot receive message")
}
}
}()

Expand Down
27 changes: 27 additions & 0 deletions internal/handlers/helpers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package handlers

import (
"fmt"
"os"
)

func createBackendFile(path string, statePath string) error {
content := `terraform {
backend "local" {
path = "%s"
}
}`

file, err := os.Create(fmt.Sprintf("%s/backend_override.tf", path))
if err != nil {
return err
}
defer file.Close()

_, err = fmt.Fprintf(file, content, statePath)
if err != nil {
return err
}

return nil
}
83 changes: 38 additions & 45 deletions internal/handlers/plan_worker_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"github.com/go-git/go-git/v5"
"github.com/go-git/go-git/v5/plumbing"
"github.com/pkg/errors"
"labraboard/internal/aggregates"
eb "labraboard/internal/eventbus"
"labraboard/internal/eventbus/events"
Expand Down Expand Up @@ -42,34 +43,18 @@ func (handler *triggeredPlanHandler) Handle(ctx context.Context) {
if err != nil {
log.Error().Err(fmt.Errorf("cannot handle message type %T", event))
}
log.Info().Msgf("Received message: %s", msg)
handler.handlePlanTriggered(event, log.WithContext(ctx))
}

}

func createBackendFile(path string, statePath string) error {
content := `terraform {
backend "local" {
path = "%s"
}
}`

file, err := os.Create(fmt.Sprintf("%s/backend_override.tf", path))
if err != nil {
return err
}
defer file.Close()

_, err = fmt.Fprintf(file, content, statePath)
if err != nil {
return err
log.Trace().Msgf("Received message: %s", msg)
err = handler.handlePlanTriggered(event, log.WithContext(ctx))
if err != nil {
log.Error().Err(err).Msgf("Cannot successful create plan %s", event.PlanId)
} else {
log.Info().Msgf("successful create the plan %s", event.PlanId)
}
}

return nil
}

func (handler *triggeredPlanHandler) handlePlanTriggered(obj events.PlanTriggered, ctx context.Context) {
func (handler *triggeredPlanHandler) handlePlanTriggered(obj events.PlanTriggered, ctx context.Context) error {
log := logger.GetWitContext(ctx).With().Str("planId", obj.PlanId.String()).Str("projectId", obj.ProjectId.String()).Logger()
var input = iacSvc.Input{
ProjectId: obj.ProjectId,
Expand All @@ -84,14 +69,15 @@ func (handler *triggeredPlanHandler) handlePlanTriggered(obj events.PlanTriggere

if err != nil {
log.Error().Err(err)
return
return errors.Wrap(err, "Cannot assembly of event")
}

folderPath := fmt.Sprintf("/tmp/%s", assembly.PlanId)
tofuFolderPath := fmt.Sprintf("%s/%s", folderPath, assembly.RepoPath)

gitRepo, err := git.PlainClone(folderPath, false, &git.CloneOptions{
URL: assembly.RepoUrl,
Progress: os.Stdout,
Progress: nil, //os.Stdout,
})

defer func(folderPath string) {
Expand All @@ -106,47 +92,52 @@ func (handler *triggeredPlanHandler) handlePlanTriggered(obj events.PlanTriggere
case models.TAG:
tag, err := gitRepo.Tag(assembly.CommitName)
if err != nil {
log.Error().Err(err)
return
log.Error().Err(err).Msg(err.Error())
return errors.Wrap(err, fmt.Sprintf("Cannot checkin tag %s", assembly.CommitName))
}
commitSha = tag.Hash().String()
case models.SHA:
object, err := gitRepo.CommitObject(plumbing.NewHash(assembly.CommitName))
if err != nil {
log.Error().Err(err)
return
log.Error().Err(err).Msg(err.Error())
return errors.Wrap(err, fmt.Sprintf("Cannot checkin commit %s", assembly.CommitName))
}
commitSha = object.Hash.String()
case models.BRANCH:
branchConfig, err := gitRepo.CommitObject(plumbing.NewHash(assembly.CommitName))
branchConfig, err := gitRepo.Branch(assembly.CommitName)
if err != nil {
log.Error().Err(err)
return
log.Error().Err(err).Msg(err.Error())
return errors.Wrap(err, fmt.Sprintf("Cannot checkin branch %s", assembly.CommitName))
}
commitSha = branchConfig.Hash.String()
commitSha = branchConfig.Name //fix to have hash
}

if err = createBackendFile(tofuFolderPath, "./.local-state"); err != nil {
log.Error().Err(err)
return
return errors.Wrap(err, "Cannot create backend")
}

tofu, err := iacSvc.NewTofuIacService(tofuFolderPath)
if err != nil {
log.Error().Err(err)
return
return errors.Wrap(err, "Cannot initialize tofu")
}

iacTerraformPlanJson, err := tofu.Plan(assembly.InlineEnvVariable(), assembly.InlineVariable(), log.WithContext(ctx))
iac, err := handler.unitOfWork.IacRepository.Get(input.ProjectId, log.WithContext(ctx))
if err != nil {
log.Error().Err(err).Msg("missing project")
return errors.Wrap(err, "missing project")
}

iacTerraformPlanJson, err := tofu.Plan(assembly.InlineEnvVariable(), assembly.InlineVariable(), log.WithContext(ctx))
if err != nil {
iac.UpdatePlan(obj.PlanId, vo.Failed)
log.Warn().Err(err).Msg(err.Error())
if err = handler.unitOfWork.IacRepository.Update(iac, log.WithContext(ctx)); err != nil {
log.Warn().Err(err).Msg(err.Error())
return
return errors.Wrap(err, "cannot update iac")
}
return
return errors.Wrap(err, "failed generate plan")
}

historyEnvs := make([]vo.IaCEnv, len(assembly.EnvVariables))
Expand Down Expand Up @@ -177,24 +168,26 @@ func (handler *triggeredPlanHandler) handlePlanTriggered(obj events.PlanTriggere
plan, err = aggregates.NewIacPlan(obj.PlanId, aggregates.Tofu, historyConfiguration)
if err != nil {
log.Error().Err(err)
return
return errors.Wrap(err, "cannot create new plan aggregate")
}
if err = handler.unitOfWork.IacPlan.Add(plan, log.WithContext(ctx)); err != nil {
log.Error().Err(err)
return
return errors.Wrap(err, "cannot save plan aggregate into db")
}
}

plan.AddPlan(iacTerraformPlanJson.GetPlan())
plan.AddChanges(iacTerraformPlanJson.GetChanges()...)
iac.UpdatePlan(obj.PlanId, vo.Succeed) //optimistic change :)

if err = handler.unitOfWork.IacPlan.Update(plan, log.WithContext(ctx)); err != nil {
iac.UpdatePlan(obj.PlanId, vo.Failed)
log.Error().Err(err)
return
log.Error().Err(err).Msg("Cannot update plan aggregate into db")
return errors.Wrap(err, "Cannot update plan aggregate into db")
}
if err = handler.unitOfWork.IacRepository.Update(iac, log.WithContext(ctx)); err != nil {
log.Error().Err(err)
return
return errors.Wrap(err, "Cannot update iac aggregate into db")
}
log.Info().Msg("successful handle the event")
return nil
}
92 changes: 89 additions & 3 deletions internal/handlers/scheduled_iac_apply_handler.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
package handlers

import (
"bufio"
"encoding/json"
"fmt"
"github.com/go-git/go-git/v5"
"github.com/go-git/go-git/v5/plumbing"
"github.com/pkg/errors"
"golang.org/x/net/context"
eb "labraboard/internal/eventbus"
"labraboard/internal/eventbus/events"
"labraboard/internal/logger"
"labraboard/internal/models"
"labraboard/internal/repositories"
"labraboard/internal/services/iac"
"os"
)

type scheduledIaCApplyHandler struct {
Expand Down Expand Up @@ -38,6 +44,7 @@ func (handler *scheduledIaCApplyHandler) Handle(ctx context.Context) {
}

func (handler *scheduledIaCApplyHandler) handle(event events.IacApplyScheduled, ctx context.Context) {
const tfPlanPath = "plan.tfplan"
log := logger.GetWitContext(ctx).
With().
Str("changeId", event.ChangeId.String()).
Expand All @@ -60,12 +67,91 @@ func (handler *scheduledIaCApplyHandler) handle(event events.IacApplyScheduled,
RepoPath: "",
}

_, err := assembler.Assemble(input, log.WithContext(ctx))
output, err := assembler.Assemble(input, log.WithContext(ctx))
if err != nil {
log.Error().Err(err)
return
}
//save tfplan to tfplan
//run apply

if len(output.PlanRaw) == 0 {
err = errors.New("Missing plan")
log.Error().Err(err)
return
}

folderPath := fmt.Sprintf("/tmp/%s/apply", output.PlanId)
tofuFolderPath := fmt.Sprintf("%s/%s", folderPath, output.RepoPath)

planPath := fmt.Sprintf("%s/%s", tofuFolderPath, tfPlanPath)
gitRepo, err := git.PlainClone(folderPath, false, &git.CloneOptions{
URL: output.RepoUrl,
Progress: os.Stdout,
})

defer func(folderPath string) {
err = os.RemoveAll(folderPath)
if err != nil {
log.Error().Err(err)
return
}
}(folderPath)

switch output.CommitType {
case models.TAG:
_, err = gitRepo.Tag(output.CommitName)
if err != nil {
log.Error().Err(err)
return
}
case models.SHA:
_, err = gitRepo.CommitObject(plumbing.NewHash(output.CommitName))
if err != nil {
log.Error().Err(err)
return
}
case models.BRANCH:
_, err = gitRepo.CommitObject(plumbing.NewHash(output.CommitName))
if err != nil {
log.Error().Err(err)
return
}
}

if err = createBackendFile(tofuFolderPath, "./.local-state"); err != nil {
log.Error().Err(err)
return
}

if err = handler.savePlanAsTfPlan(planPath, output.PlanRaw); err != nil {
log.Error().Err(err)
return
}

tofu, err := iac.NewTofuIacService(tofuFolderPath)
if err != nil {
log.Error().Err(err)
return
}
_, err = tofu.Apply(output.PlanId, output.InlineEnvVariable(), output.InlineVariable(), planPath, ctx)

}

func (handler *scheduledIaCApplyHandler) savePlanAsTfPlan(path string, planRaw []byte) error {
fo, err := os.Create(path)
if err != nil {
panic(err)
}
// close fo on exit and check for its returned error
defer func() {
if err = fo.Close(); err != nil {
err = errors.Wrap(err, "problem with close file")
}
}()

w := bufio.NewWriter(fo)
if _, err = w.Write(planRaw); err != nil {
err = errors.Wrap(err, "problem with write file")
}

return err
}
2 changes: 1 addition & 1 deletion internal/managers/delay_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (dt *delayTask) Listen(ctx context.Context) {
tasks := make([]task, len(resultSet))

if len(tasks) == 0 {
log.Info().Msg("nothing to publish")
log.Trace().Msg("nothing to publish")
return
}

Expand Down
Loading

0 comments on commit 7e7b575

Please sign in to comment.