Skip to content

Commit

Permalink
Separate success and failure flows in asoctl resource importing (#4452)
Browse files Browse the repository at this point in the history
* Change interface report to minimize coupling

* Create import_error.go

* Separate success and failure information flows

* Tweak error production

* Simplify interfaces

* Fix merge
  • Loading branch information
theunrepentantgeek authored Nov 25, 2024
1 parent c39cf82 commit bc42335
Show file tree
Hide file tree
Showing 6 changed files with 155 additions and 91 deletions.
36 changes: 36 additions & 0 deletions v2/cmd/asoctl/pkg/importresources/import_error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright (c) Microsoft Corporation.
* Licensed under the MIT license.
*/

package importresources

import (
"fmt"

"k8s.io/apimachinery/pkg/runtime/schema"
)

type ImportError struct {
err error // The error that caused the import to fail
gk schema.GroupKind // The GroupKind of the resource that failed to import
name string // The name of the resource that failed to import
}

var _ error = &ImportError{}

func MakeImportError(err error, gk schema.GroupKind, name string) ImportError {
return ImportError{
err: err,
gk: gk,
name: name,
}
}

func (ie *ImportError) Error() string {
return fmt.Sprintf("failed to import %s %s: %s", ie.gk, ie.name, ie.err.Error())
}

func (ie *ImportError) Unwrap() error {
return ie.err
}
32 changes: 19 additions & 13 deletions v2/cmd/asoctl/pkg/importresources/importable_arm_resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,44 +100,50 @@ func (i *importableARMResource) Resource() genruntime.MetaObject {
// ctx is the context to use for the import.
func (i *importableARMResource) Import(
ctx context.Context,
progress importreporter.Interface,
_ logr.Logger,
) (ImportedResource, error) {
) (ImportResourceResult, error) {
// Create an importable blank object into which we capture the current state of the resource
importable, err := i.createImportableObjectFromID(i.owner, i.armID)
if err != nil {
// Error doesn't need additional context
return i, err
return ImportResourceResult{}, err
}

// Our resource might have an extension that can customize the import process,
// so we have a factory to create the loader function we call.
loader := i.createImportFunction(importable)
result, err := loader(ctx, importable, i.owner)
loaderResult, err := loader(ctx, importable, i.owner)
if err != nil {
i.err = err
return i, err
return ImportResourceResult{}, err
}

if because, skipped := result.Skipped(); skipped {
if because, skipped := loaderResult.Skipped(); skipped {
gk := importable.GetObjectKind().GroupVersionKind().GroupKind()
return i, NewSkippedError(gk, i.armID.Name, because, i)
return ImportResourceResult{}, NewSkippedError(gk, i.armID.Name, because, i)
}

i.resource = importable

return i, nil
}
result := ImportResourceResult{
resource: i,
}

if children, err := i.findChildren(ctx, progress); err != nil {
return result, err
} else {
result.pending = children
}

// Error returns any error that occurred during the import.
func (i *importableARMResource) Error() error {
return i.err
return result, nil
}

// FindChildren returns any child resources that need to be imported.
// findChildren returns any child resources that need to be imported.
// ctx allows for cancellation of the import.
// Returns any additional resources that also need to be imported, as well as any errors that occur.
// Partial success is allowed, but the caller should be notified of any errors.
func (i *importableARMResource) FindChildren(
func (i *importableARMResource) findChildren(
ctx context.Context,
progress importreporter.Interface,
) ([]ImportableResource, error) {
Expand Down
12 changes: 2 additions & 10 deletions v2/cmd/asoctl/pkg/importresources/importable_resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,10 @@ type ImportableResource interface {
// ctx allows for cancellation of the import.
// log allows information about progress to be reported
Import(
ctx context.Context,
log logr.Logger,
) (ImportedResource, error)

// FindChildren returns any child resources that need to be imported.
// ctx allows for cancellation of the import.
// Returns any additional resources that also need to be imported, as well as any errors that occur.
// Partial success is allowed, but the caller should be notified of any errors.
FindChildren(
ctx context.Context,
reporter importreporter.Interface,
) ([]ImportableResource, error)
log logr.Logger,
) (ImportResourceResult, error)
}

// importableResource is a core of common data and support methods for implementing ImportableResource
Expand Down
3 changes: 0 additions & 3 deletions v2/cmd/asoctl/pkg/importresources/imported_resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,4 @@ type ImportedResource interface {

// Resource returns the actual resource that has been imported.
Resource() genruntime.MetaObject

// Error returns any error that occurred during the import.
Error() error
}
136 changes: 80 additions & 56 deletions v2/cmd/asoctl/pkg/importresources/resource_importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,8 @@ type ResourceImporterOptions struct {
}

type ImportResourceResult struct {
resource ImportedResource
pending []ImportableResource
err error
resource ImportedResource // The resource that was imported
pending []ImportableResource // Any child resources that need to be imported next
}

// New creates a new factory with the scheme baked in
Expand Down Expand Up @@ -83,21 +82,25 @@ func (ri *ResourceImporter) Import(
done chan struct{},
) (*Result, error) {
workersRequired := ri.desiredWorkers()

candidates := make(chan ImportableResource) // candidates that need to be deduped
pending := make(chan ImportableResource) // importers that are pending import
completed := make(chan ImportResourceResult) // importers that have been executed successfully
report := make(chan *resourceImportReport) // summary report of the import
successes := make(chan ImportResourceResult) // importers that have been executed successfully
failures := make(chan ImportError) // errors from importers that failed
completions := make(chan struct{}) // channel to signal completion

// Dedupe candidates so we import each distinct resource only once
go ri.queueUniqueImporters(candidates, pending, ri.reporter)

// Create workers to run the import
for i := 0; i < workersRequired; i++ {
go ri.importWorker(ctx, pending, completed, ri.reporter)
go ri.importWorker(ctx, pending, successes, failures, ri.reporter, completions)
}

// Collate the results
go ri.collateResults(completed, candidates, ri.reporter, report)
report := newResourceImportReport()
go ri.collateResults(successes, candidates, ri.reporter, report, completions)
go ri.collateErrors(failures, report, completions)

// Set up by adding our initial resources; these will be completed when we collate their results
for _, rsrc := range ri.resources {
Expand All @@ -117,11 +120,15 @@ func (ri *ResourceImporter) Import(
// Close channels so final reporting and other cleanup occurs
close(candidates)
close(pending)
close(completed)
close(successes)

// Wait for everything to finish
for i := 0; i < workersRequired+2; i++ {
<-completions
}

// Get the summary report and write it
rpt := <-report
rpt.WriteToLog(ri.log)
report.WriteToLog(ri.log)

// Now we've imported everything, return the resources
// We do this even if there's an error so that we can return partial results
Expand Down Expand Up @@ -186,11 +193,21 @@ func (ri *ResourceImporter) queueUniqueImporters(
}
}

// importerWorker is a goroutine for importing resources.
// It reads from the pending channel, imports the resource, and sends the result to the completed
// channel if it worked, or the failed channel if it didn't.
// ctx is used to check for cancellation.
// pending is a source of resources to import.
// completed is where we send the result of a successful import.
// failed is where we send the error from a failed import.
// done is a channel we signal when we're finished.
func (ri *ResourceImporter) importWorker(
ctx context.Context,
pending <-chan ImportableResource,
completed chan<- ImportResourceResult,
failed chan<- ImportError,
progress importreporter.Interface,
done chan<- struct{},
) {
for rsrc := range pending {
if ctx.Err() != nil {
Expand All @@ -201,18 +218,24 @@ func (ri *ResourceImporter) importWorker(

// We have a resource to import
ri.log.V(1).Info("Importing", "resource", rsrc.ID())
result := ri.importResource(ctx, rsrc, progress)
completed <- result

if imported, err := ri.importResource(ctx, rsrc, progress); err != nil {
failed <- MakeImportError(err, rsrc.GroupKind(), rsrc.Name())
} else {
completed <- imported
}
}

done <- struct{}{}
}

func (ri *ResourceImporter) collateResults(
completed <-chan ImportResourceResult, // completed imports for us to collate
candidates chan<- ImportableResource, // additional candidates for importing
progress importreporter.Interface, // importreporter tracking
publish chan<- *resourceImportReport, // publishing our final summary
report *resourceImportReport, // report to write to
done chan<- struct{}, // channel to signal completion
) {
report := newResourceImportReport()
for importResult := range completed {
rsrc := importResult.resource
gk := rsrc.GroupKind()
Expand All @@ -223,46 +246,54 @@ func (ri *ResourceImporter) collateResults(
candidates <- p
}

if importResult.err != nil {
var skipped *SkippedError
if eris.As(importResult.err, &skipped) {
ri.log.V(1).Info(
"Skipped",
"kind", gk,
"name", rsrc.Name(),
"because", skipped.Because)
report.AddSkippedImport(rsrc, skipped.Because)
} else {
ri.log.Error(importResult.err,
"Failed",
"kind", gk,
"name", rsrc.Name())
ri.log.Info(
"Imported",
"kind", gk,
"name", rsrc.Name())

report.AddFailedImport(rsrc, importResult.err.Error())
}
} else {
ri.log.Info(
"Imported",
"kind", gk,
"name", rsrc.Name())

report.AddSuccessfulImport(rsrc)
ri.imported[rsrc.ID()] = rsrc
}
report.AddSuccessfulImport(gk)
ri.imported[rsrc.ID()] = rsrc

// Flag the main resource as complete
// We do this after everything else because it might indicate we're finished
progress.Completed(1)
}

publish <- report
done <- struct{}{}
}

func (ri *ResourceImporter) collateErrors(
failures <-chan ImportError,
report *resourceImportReport,
done chan<- struct{},
) {
for ie := range failures {
var skipped *SkippedError
if eris.As(ie.err, &skipped) {
ri.log.V(1).Info(
"Skipped",
"kind", ie.gk,
"name", ie.name,
"because", skipped.Because)
report.AddSkippedImport(ie.gk, skipped.Because)
} else {
ri.log.Error(ie.err,
"Failed",
"kind", ie.gk,
"name", ie.name)

report.AddFailedImport(ie.gk, ie.err.Error())
}
}

done <- struct{}{}
}

func (ri *ResourceImporter) importResource(
ctx context.Context,
rsrc ImportableResource,
parent importreporter.Interface,
) ImportResourceResult {
) (ImportResourceResult, error) {
// Import it
gk := rsrc.GroupKind()
name := fmt.Sprintf("%s %s", gk, rsrc.Name())
Expand All @@ -273,23 +304,16 @@ func (ri *ResourceImporter) importResource(
// Our main resource is pending
progress.AddPending(1)

// Import the resource itself
imported, err := rsrc.Import(ctx, ri.log)
result := ImportResourceResult{
resource: imported,
err: err,
}

// If the main resource was imported ok, look for any children
if result.err == nil {
result.pending, result.err = rsrc.FindChildren(ctx, progress)
}

// Indicate the main resource is complete
// (we must only do this after checking for children, to ensure we don't appear complete too early)
progress.Completed(1)
// (we must only do this when we return, to ensure we don't appear complete too early)
defer progress.Completed(1)

return result
// Import the resource itself
if imported, err := rsrc.Import(ctx, progress, ri.log); err != nil {
return ImportResourceResult{}, eris.Wrapf(err, "importing %s", name)
} else {
return imported, nil
}
}

// desiredWorkers returns the number of workers to use for importing resources.
Expand Down
Loading

0 comments on commit bc42335

Please sign in to comment.