Skip to content

Commit

Permalink
fix: change image pull and push (#2057)
Browse files Browse the repository at this point in the history
Signed-off-by: 张启航 <[email protected]>
  • Loading branch information
ZhangSetSail authored Nov 20, 2024
1 parent dd67274 commit 974e5ee
Showing 1 changed file with 91 additions and 82 deletions.
173 changes: 91 additions & 82 deletions builder/sources/image_containerd_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,16 +111,6 @@ func (c *containerdImageCliImpl) ImagePull(image string, username, password stri
hostOpt.Credentials = func(host string) (string, string, error) {
return username, password, nil
}
// 如果 image 以 "https://" 或 "http://" 开头,去掉前缀
if strings.HasPrefix(image, "https://") {
image = strings.TrimPrefix(image, "https://")

} else if strings.HasPrefix(image, "http://") {
image = strings.TrimPrefix(image, "http://")
hostOpt.DefaultScheme = "http"
} else {
hostOpt.DefaultScheme = "http"
}

named, err := refdocker.ParseDockerRef(image)
if err != nil {
Expand All @@ -130,6 +120,8 @@ func (c *containerdImageCliImpl) ImagePull(image string, username, password stri
ongoing := NewJobs(reference)
ctx := namespaces.WithNamespace(context.Background(), Namespace)
pctx, stopProgress := context.WithCancel(ctx)
defer stopProgress()

progress := make(chan struct{})

go func() {
Expand All @@ -152,22 +144,40 @@ func (c *containerdImageCliImpl) ImagePull(image string, username, password stri
platformMC := platforms.Ordered([]ocispec.Platform{platforms.DefaultSpec()}...)
opts := []containerd.RemoteOpt{
containerd.WithImageHandler(h),
//nolint:staticcheck
containerd.WithSchema1Conversion, //lint:ignore SA1019 nerdctl should support schema1 as well.
containerd.WithPlatformMatcher(platformMC),
containerd.WithResolver(docker.NewResolver(options)),
}

// First attempt with HTTPS
hostOpt.DefaultScheme = "https"
opts = append(opts, containerd.WithResolver(docker.NewResolver(options)))

var img containerd.Image
img, err = c.client.Pull(pctx, reference, opts...)
stopProgress()
if err != nil {
// If protocol error occurs, try HTTP
if isProtocolError(err) {
printLog(logger, "warn", fmt.Sprintf("HTTPS pull failed for image %s, trying HTTP", reference), map[string]string{"step": "pullimage"})
hostOpt.DefaultScheme = "http"
img, err = c.client.Pull(pctx, reference, opts...)
}
}

if err != nil {
stopProgress()
return nil, err
}

<-progress
printLog(logger, "info", fmt.Sprintf("Success Pull Image:%s", reference), map[string]string{"step": "pullimage"})
return getImageConfig(ctx, img)
}

// Helper function to check if the error is protocol related
func isProtocolError(err error) bool {
return strings.Contains(err.Error(), "invalid protocol") || strings.Contains(err.Error(), "unsupported protocol scheme")
}

func getImageConfig(ctx context.Context, image containerd.Image) (*ocispec.ImageConfig, error) {
desc, err := image.Config(ctx)
if err != nil {
Expand All @@ -192,6 +202,7 @@ func getImageConfig(ctx context.Context, image containerd.Image) (*ocispec.Image

func (c *containerdImageCliImpl) ImagePush(image, user, pass string, logger event.Logger, timeout int) error {
printLog(logger, "info", fmt.Sprintf("开始推送镜像:%s", image), map[string]string{"step": "pushimage"})

hostOptions := config.HostOptions{
DefaultTLS: &tls.Config{
InsecureSkipVerify: true,
Expand All @@ -200,16 +211,6 @@ func (c *containerdImageCliImpl) ImagePush(image, user, pass string, logger even
return user, pass, nil
},
}
// 如果 image 以 "https://" 或 "http://" 开头,去掉前缀
if strings.HasPrefix(image, "https://") {
image = strings.TrimPrefix(image, "https://")

} else if strings.HasPrefix(image, "http://") {
image = strings.TrimPrefix(image, "http://")
hostOptions.DefaultScheme = "http"
} else {
hostOptions.DefaultScheme = "http"
}

named, err := refdocker.ParseDockerRef(image)
if err != nil {
Expand Down Expand Up @@ -245,68 +246,83 @@ func (c *containerdImageCliImpl) ImagePush(image, user, pass string, logger even
Tracker: NewTracker,
}

options.Hosts = config.ConfigureHosts(ctx, hostOptions)
resolver := docker.NewResolver(options)
ongoing := newPushJobs(NewTracker)

// 使用 error group 进行推送任务管理
eg, ctx := errgroup.WithContext(ctx)
doneCh := make(chan struct{})

// 镜像推送任务
eg.Go(func() error {
defer close(doneCh)
jobHandler := images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
ongoing.add(remotes.MakeRefKey(ctx, desc))
return nil, nil
})

ropts := []containerd.RemoteOpt{
containerd.WithResolver(resolver),
containerd.WithImageHandler(jobHandler),
}
var pushFunc = func(scheme string) error {
// 配置指定协议
hostOptions.DefaultScheme = scheme
options.Hosts = config.ConfigureHosts(ctx, hostOptions)
resolver := docker.NewResolver(options)

// 镜像推送任务
eg, ctx := errgroup.WithContext(ctx)
doneCh := make(chan struct{})

eg.Go(func() error {
defer close(doneCh)
jobHandler := images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
ongoing.add(remotes.MakeRefKey(ctx, desc))
return nil, nil
})

ropts := []containerd.RemoteOpt{
containerd.WithResolver(resolver),
containerd.WithImageHandler(jobHandler),
}

// 尝试多次推送,确保清理缓存后重新拉取并推送
for attempts := 0; attempts < 3; attempts++ {
if err := c.client.Push(ctx, reference, desc, ropts...); err != nil {
if attempts < 2 {
printLog(logger, "warn", fmt.Sprintf("推送失败,重试中... (%d/3)", attempts+1), map[string]string{"step": "pushimage"})
// 清理缓存镜像重新拉取
_ = c.client.ImageService().Delete(ctx, reference)
time.Sleep(5 * time.Second)
continue
// 尝试多次推送,确保清理缓存后重新拉取并推送
for attempts := 0; attempts < 3; attempts++ {
if err := c.client.Push(ctx, reference, desc, ropts...); err != nil {
if attempts < 2 {
printLog(logger, "warn", fmt.Sprintf("推送失败,重试中... (%d/3)", attempts+1), map[string]string{"step": "pushimage"})
_ = c.client.ImageService().Delete(ctx, reference) // 清理缓存镜像
time.Sleep(5 * time.Second)
continue
}
return errors.Wrap(err, "推送过程中发生错误")
}
return errors.Wrap(err, "推送过程中发生错误")
break
}
break
}
return nil
})
return nil
})

// 进度显示任务
eg.Go(func() error {
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
start := time.Now()
done := false

for {
select {
case <-ticker.C:
Display(ongoing.status(), start, logger)
if done {
return nil
// 进度显示任务
eg.Go(func() error {
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
start := time.Now()
done := false

for {
select {
case <-ticker.C:
Display(ongoing.status(), start, logger)
if done {
return nil
}
case <-doneCh:
done = true
case <-ctx.Done():
done = true
}
case <-doneCh:
done = true
case <-ctx.Done():
done = true
}
})

// 等待所有 goroutine 完成
return eg.Wait()
}

// 先尝试 HTTPS
err = pushFunc("https")
if err != nil {
// 如果是协议相关错误,尝试降级为 HTTP
if isProtocolError(err) {
printLog(logger, "warn", fmt.Sprintf("HTTPS 推送失败,尝试使用 HTTP:%s", err.Error()), map[string]string{"step": "pushimage"})
err = pushFunc("http")
}
})
}

// 等待所有 goroutine 完成
if err := eg.Wait(); err != nil {
if err != nil {
return errors.Wrap(err, "推送失败")
}

Expand All @@ -321,13 +337,6 @@ func (c *containerdImageCliImpl) ImageTag(source, target string, logger event.Lo
return err
}
srcImage := srcNamed.String()

if strings.HasPrefix(target, "https://") {
target = strings.TrimPrefix(target, "https://")
} else if strings.HasPrefix(target, "http://") {
target = strings.TrimPrefix(target, "http://")
}

targetNamed, err := refdocker.ParseDockerRef(target)
if err != nil {
return err
Expand Down

0 comments on commit 974e5ee

Please sign in to comment.