Skip to content

Commit

Permalink
chore: remove unnecessary concurrency
Browse files Browse the repository at this point in the history
  • Loading branch information
emosbaugh committed Jan 23, 2025
1 parent 29c846c commit 2851abe
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 46 deletions.
2 changes: 1 addition & 1 deletion pkg/plan/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func executeAppUpgradeService(s store.Store, p *types.Plan, step *types.PlanStep
return errors.Wrap(err, "update step status")
}

if err := waitForStep(p, step.ID); err != nil {
if err := waitForStep(s, p, step.ID); err != nil {
return errors.Wrap(err, "wait for upgrade service")
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/plan/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func executeK0sUpgrade(s store.Store, p *types.Plan, step *types.PlanStep) error
}
}

if err := waitForStep(p, step.ID); err != nil {
if err := waitForStep(s, p, step.ID); err != nil {
return errors.Wrap(err, "wait for k0s upgrade")
}

Expand Down
13 changes: 7 additions & 6 deletions pkg/plan/extensions.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
ecv1beta1 "github.com/replicatedhq/embedded-cluster/kinds/apis/v1beta1"
"github.com/replicatedhq/kots/pkg/embeddedcluster"
"github.com/replicatedhq/kots/pkg/plan/types"
"github.com/replicatedhq/kots/pkg/store"
"github.com/replicatedhq/kots/pkg/websocket"
"github.com/segmentio/ksuid"
kbclient "sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -81,7 +82,7 @@ func planECExtensions(kcli kbclient.Client, newSpec *ecv1beta1.ConfigSpec) ([]*t
return steps, nil
}

func executeECExtensionAdd(p *types.Plan, step *types.PlanStep) error {
func executeECExtensionAdd(s store.Store, p *types.Plan, step *types.PlanStep) error {
in, ok := step.Input.(types.PlanStepInputECExtension)
if !ok {
return errors.New("invalid input for embedded cluster extension add step")
Expand All @@ -93,14 +94,14 @@ func executeECExtensionAdd(p *types.Plan, step *types.PlanStep) error {
}
}

if err := waitForStep(p, step.ID); err != nil {
if err := waitForStep(s, p, step.ID); err != nil {
return errors.Wrap(err, "wait for embedded cluster extension add")
}

return nil
}

func executeECExtensionUpgrade(p *types.Plan, step *types.PlanStep) error {
func executeECExtensionUpgrade(s store.Store, p *types.Plan, step *types.PlanStep) error {
in, ok := step.Input.(types.PlanStepInputECExtension)
if !ok {
return errors.New("invalid input for embedded cluster extension upgrade step")
Expand All @@ -112,14 +113,14 @@ func executeECExtensionUpgrade(p *types.Plan, step *types.PlanStep) error {
}
}

if err := waitForStep(p, step.ID); err != nil {
if err := waitForStep(s, p, step.ID); err != nil {
return errors.Wrap(err, "wait for embedded cluster extension upgrade")
}

return nil
}

func executeECExtensionRemove(p *types.Plan, step *types.PlanStep) error {
func executeECExtensionRemove(s store.Store, p *types.Plan, step *types.PlanStep) error {
in, ok := step.Input.(types.PlanStepInputECExtension)
if !ok {
return errors.New("invalid input for embedded cluster extension remove step")
Expand All @@ -131,7 +132,7 @@ func executeECExtensionRemove(p *types.Plan, step *types.PlanStep) error {
}
}

if err := waitForStep(p, step.ID); err != nil {
if err := waitForStep(s, p, step.ID); err != nil {
return errors.Wrap(err, "wait for embedded cluster extension remove")
}

Expand Down
19 changes: 10 additions & 9 deletions pkg/plan/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,6 @@ var wsDialer = &gwebsocket.Dialer{
}

func TestUpgradeECManager(t *testing.T) {
// Create a mock store
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockStore := mock_store.NewMockStore(ctrl)

// Test app
versionLabel := "test-version-label"
app := &apptypes.App{
Expand Down Expand Up @@ -66,7 +61,7 @@ spec:
name string
managers []manager
app *apptypes.App
mockStoreExpectations func()
mockStoreExpectations func(mockStore *mock_store.MockStore)
wantSteps []*types.PlanStep
}{
{
Expand All @@ -78,7 +73,7 @@ spec:
},
},
app: app,
mockStoreExpectations: func() {
mockStoreExpectations: func(mockStore *mock_store.MockStore) {
mockStore.EXPECT().GetAppFromSlug(app.Slug).Return(app, nil).Times(1)
mockStore.EXPECT().GetPlan(app.ID, versionLabel).Return(p, nil).AnyTimes()
mockStore.EXPECT().UpsertPlan(p).Return(nil).Times(1)
Expand Down Expand Up @@ -116,7 +111,7 @@ spec:
},
},
app: app,
mockStoreExpectations: func() {
mockStoreExpectations: func(mockStore *mock_store.MockStore) {
mockStore.EXPECT().GetAppFromSlug(app.Slug).Return(app, nil).Times(2)
mockStore.EXPECT().GetPlan(app.ID, versionLabel).Return(p, nil).AnyTimes()
mockStore.EXPECT().UpsertPlan(p).Return(nil).Times(2)
Expand Down Expand Up @@ -153,8 +148,14 @@ spec:
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Create a mock store
ctrl := gomock.NewController(t)
defer ctrl.Finish()

mockStore := mock_store.NewMockStore(ctrl)

// Mock store expectations
tt.mockStoreExpectations()
tt.mockStoreExpectations(mockStore)

// Create and start test server
ts := NewTestServer(t)
Expand Down
52 changes: 23 additions & 29 deletions pkg/plan/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,37 +122,24 @@ func Resume(s store.Store) error {

// TODO (@salah): make each step report better status
func Execute(s store.Store, p *types.Plan) error {
stopCh := make(chan struct{})
defer close(stopCh)
go startPlanMonitor(s, p, stopCh)

for _, step := range p.Steps {
if err := executeStep(s, p, step); err != nil {
return errors.Wrap(err, "execute step")
}

updated, err := s.GetPlan(p.AppID, p.VersionLabel)
if err != nil {
logger.Errorf("Failed to get plan: %v", err)
continue
}
*p = *updated
}

logger.Infof("Plan %q completed successfully", p.ID)

return nil
}

func startPlanMonitor(s store.Store, p *types.Plan, stopCh chan struct{}) {
for {
select {
case <-stopCh:
return
case <-time.After(time.Second * 2):
updated, err := s.GetPlan(p.AppID, p.VersionLabel)
if err != nil {
logger.Error(errors.Wrap(err, "get plan"))
continue
}
*p = *updated
}
}
}

func executeStep(s store.Store, p *types.Plan, step *types.PlanStep) (finalError error) {
defer func() {
if finalError != nil {
Expand Down Expand Up @@ -189,17 +176,17 @@ func executeStep(s store.Store, p *types.Plan, step *types.PlanStep) (finalError
}

case types.StepTypeECExtensionAdd:
if err := executeECExtensionAdd(p, step); err != nil {
if err := executeECExtensionAdd(s, p, step); err != nil {
return errors.Wrap(err, "execute embedded cluster extension add")
}

case types.StepTypeECExtensionUpgrade:
if err := executeECExtensionUpgrade(p, step); err != nil {
if err := executeECExtensionUpgrade(s, p, step); err != nil {
return errors.Wrap(err, "execute embedded cluster extension upgrade")
}

case types.StepTypeECExtensionRemove:
if err := executeECExtensionRemove(p, step); err != nil {
if err := executeECExtensionRemove(s, p, step); err != nil {
return errors.Wrap(err, "execute embedded cluster extension remove")
}

Expand All @@ -215,24 +202,31 @@ func executeStep(s store.Store, p *types.Plan, step *types.PlanStep) (finalError
return nil
}

func waitForStep(p *types.Plan, stepID string) error {
func waitForStep(s store.Store, p *types.Plan, stepID string) error {
for {
updated, err := s.GetPlan(p.AppID, p.VersionLabel)
if err != nil {
logger.Errorf("Failed to get plan: %v", err)
time.Sleep(time.Second * 2)
continue
}

stepIndex := -1
for i, step := range p.Steps {
for i, step := range updated.Steps {
if step.ID == stepID {
stepIndex = i
break
}
}
if stepIndex == -1 {
return errors.Errorf("step %s not found in plan %s", stepID, p.ID)
return errors.Errorf("step %s not found in plan %s", stepID, updated.ID)
}

if p.Steps[stepIndex].Status == types.StepStatusComplete {
if updated.Steps[stepIndex].Status == types.StepStatusComplete {
return nil
}
if p.Steps[stepIndex].Status == types.StepStatusFailed {
return errors.Errorf("step failed: %s", p.Steps[stepIndex].StatusDescription)
if updated.Steps[stepIndex].Status == types.StepStatusFailed {
return errors.Errorf("step failed: %s", updated.Steps[stepIndex].StatusDescription)
}

time.Sleep(time.Second * 2)
Expand Down

0 comments on commit 2851abe

Please sign in to comment.