diff --git a/pkg/daemon/criruntime/factory.go b/pkg/daemon/criruntime/factory.go index 8279b1cd6d..bebef92198 100644 --- a/pkg/daemon/criruntime/factory.go +++ b/pkg/daemon/criruntime/factory.go @@ -90,12 +90,6 @@ func NewFactory(varRunPath string, accountManager daemonutil.ImagePullAccountMan var typedVersion *runtimeapi.VersionResponse switch cfg.runtimeType { - case ContainerRuntimeDocker: - imageService, err = runtimeimage.NewDockerImageService(cfg.runtimeURI, accountManager) - if err != nil { - klog.Warningf("Failed to new image service for %v (%s, %s): %v", cfg.runtimeType, cfg.runtimeURI, cfg.runtimeRemoteURI, err) - continue - } case ContainerRuntimeContainerd, ContainerRuntimeCommonCRI, ContainerRuntimePouch: addr, _, err := kubeletutil.GetAddressAndDialer(cfg.runtimeRemoteURI) if err != nil { @@ -107,6 +101,12 @@ func NewFactory(varRunPath string, accountManager daemonutil.ImagePullAccountMan klog.Warningf("Failed to new image service for %v (%s, %s): %v", cfg.runtimeType, cfg.runtimeURI, cfg.runtimeRemoteURI, err) continue } + case ContainerRuntimeDocker: + imageService, err = runtimeimage.NewDockerImageService(cfg.runtimeURI, accountManager) + if err != nil { + klog.Warningf("Failed to new image service for %v (%s, %s): %v", cfg.runtimeType, cfg.runtimeURI, cfg.runtimeRemoteURI, err) + continue + } } if _, err = imageService.ListImages(context.TODO()); err != nil { diff --git a/pkg/daemon/criruntime/imageruntime/containerd.go b/pkg/daemon/criruntime/imageruntime/containerd.go index e4706708c5..1b9921e406 100644 --- a/pkg/daemon/criruntime/imageruntime/containerd.go +++ b/pkg/daemon/criruntime/imageruntime/containerd.go @@ -73,6 +73,7 @@ func NewContainerdImageService( accountManager: accountManager, snapshotter: snapshotter, client: client, + // TODO: compatible with v1alpha2 cri api criImageClient: runtimeapi.NewImageServiceClient(conn), httpProxy: httpProxy, }, nil @@ -325,6 +326,7 @@ func getDefaultValuesFromCRIStatus(conn *grpc.ClientConn) (snapshotter string, h ctx, cancel := context.WithTimeout(context.TODO(), 10*time.Second) defer cancel() + // TODO: compatible with v1alpha2 cri api rclient := runtimeapi.NewRuntimeServiceClient(conn) resp, err := rclient.Status(ctx, &runtimeapi.StatusRequest{Verbose: true}) if err != nil { diff --git a/pkg/daemon/criruntime/imageruntime/cri.go b/pkg/daemon/criruntime/imageruntime/cri.go index 7190e662e1..a4072985c8 100644 --- a/pkg/daemon/criruntime/imageruntime/cri.go +++ b/pkg/daemon/criruntime/imageruntime/cri.go @@ -16,6 +16,7 @@ package imageruntime import ( "context" "io" + "reflect" "time" appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" @@ -26,6 +27,7 @@ import ( v1 "k8s.io/api/core/v1" utilerrors "k8s.io/apimachinery/pkg/util/errors" runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1" + runtimeapiv1alpha2 "k8s.io/cri-api/pkg/apis/runtime/v1alpha2" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/kubelet/cri/remote/util" "k8s.io/kubernetes/pkg/util/parsers" @@ -53,26 +55,47 @@ func NewCRIImageService(runtimeURI string, accountManager daemonutil.ImagePullAc return nil, err } - klog.V(4).InfoS("Finding the CRI API image version") - imageClient := runtimeapi.NewImageServiceClient(conn) - - if _, err := imageClient.ImageFsInfo(ctx, &runtimeapi.ImageFsInfoRequest{}); err == nil { - klog.V(2).InfoS("Using CRI v1 image API") + imageClientV1, imageClientV1alpha2, err := determineImageClientAPIVersion(conn) + if err != nil { + klog.ErrorS(err, "Failed to determine CRI image API version") + return nil, err } return &commonCRIImageService{ - accountManager: accountManager, - criImageClient: imageClient, + accountManager: accountManager, + criImageClient: imageClientV1, + criImageClientV1alpha2: imageClientV1alpha2, }, nil } type commonCRIImageService struct { - accountManager daemonutil.ImagePullAccountManager - criImageClient runtimeapi.ImageServiceClient + accountManager daemonutil.ImagePullAccountManager + criImageClient runtimeapi.ImageServiceClient + criImageClientV1alpha2 runtimeapiv1alpha2.ImageServiceClient +} + +func (c *commonCRIImageService) useV1API() bool { + return c.criImageClientV1alpha2 == nil || reflect.ValueOf(c.criImageClientV1alpha2).IsNil() } // PullImage implements ImageService.PullImage. func (c *commonCRIImageService) PullImage(ctx context.Context, imageName, tag string, pullSecrets []v1.Secret, sandboxConfig *appsv1alpha1.SandboxConfig) (ImagePullStatusReader, error) { + if c.useV1API() { + return c.pullImageV1(ctx, imageName, tag, pullSecrets, sandboxConfig) + } + return c.pullImageV1alpha2(ctx, imageName, tag, pullSecrets, sandboxConfig) +} + +// ListImages implements ImageService.ListImages. +func (c *commonCRIImageService) ListImages(ctx context.Context) ([]ImageInfo, error) { + if c.useV1API() { + return c.listImagesV1(ctx) + } + return c.listImagesV1alpha2(ctx) +} + +// PullImage implements ImageService.PullImage using v1 CRI client. +func (c *commonCRIImageService) pullImageV1(ctx context.Context, imageName, tag string, pullSecrets []v1.Secret, sandboxConfig *appsv1alpha1.SandboxConfig) (ImagePullStatusReader, error) { registry := daemonutil.ParseRegistry(imageName) fullImageName := imageName + ":" + tag repoToPull, _, _, err := parsers.ParseImageName(fullImageName) @@ -172,8 +195,8 @@ func (c *commonCRIImageService) PullImage(ctx context.Context, imageName, tag st return newImagePullStatusReader(pipeR), nil } -// ListImages implements ImageService.ListImages. -func (c *commonCRIImageService) ListImages(ctx context.Context) ([]ImageInfo, error) { +// ListImages implements ImageService.ListImages using V1 CRI client. +func (c *commonCRIImageService) listImagesV1(ctx context.Context) ([]ImageInfo, error) { listImagesReq := &runtimeapi.ListImagesRequest{} listImagesResp, err := c.criImageClient.ListImages(ctx, listImagesReq) if err != nil { @@ -190,3 +213,123 @@ func (c *commonCRIImageService) ListImages(ctx context.Context) ([]ImageInfo, er } return collection, nil } + +// PullImage implements ImageService.PullImage using v1alpha2 CRI client. +func (c *commonCRIImageService) pullImageV1alpha2(ctx context.Context, imageName, tag string, pullSecrets []v1.Secret, sandboxConfig *appsv1alpha1.SandboxConfig) (ImagePullStatusReader, error) { + registry := daemonutil.ParseRegistry(imageName) + fullImageName := imageName + ":" + tag + repoToPull, _, _, err := parsers.ParseImageName(fullImageName) + if err != nil { + return nil, err + } + // Reader + pipeR, pipeW := io.Pipe() + defer pipeW.Close() + + var auth *runtimeapiv1alpha2.AuthConfig + pullImageReq := &runtimeapiv1alpha2.PullImageRequest{ + Image: &runtimeapiv1alpha2.ImageSpec{ + Image: fullImageName, + Annotations: make(map[string]string), + }, + Auth: auth, //default is nil + } + if sandboxConfig != nil { + pullImageReq.SandboxConfig = &runtimeapiv1alpha2.PodSandboxConfig{ + Annotations: sandboxConfig.Annotations, + Labels: sandboxConfig.Labels, + } + if pullImageReq.SandboxConfig.Annotations == nil { + pullImageReq.SandboxConfig.Annotations = map[string]string{} + } + } else { + pullImageReq.SandboxConfig = &runtimeapiv1alpha2.PodSandboxConfig{ + Annotations: map[string]string{}, + } + } + // Add this default annotation to avoid unexpected panic caused by sandboxConfig is nil + // for some runtime implementations. + pullImageReq.SandboxConfig.Annotations[pullingImageSandboxConfigAnno] = "kruise-daemon" + + if len(pullSecrets) > 0 { + var authInfos []daemonutil.AuthInfo + authInfos, err = convertToRegistryAuths(pullSecrets, repoToPull) + if err == nil { + var pullErrs []error + for _, authInfo := range authInfos { + var pullErr error + klog.V(5).Infof("Pull image %v:%v with user %v", imageName, tag, authInfo.Username) + pullImageReq.Auth = &runtimeapiv1alpha2.AuthConfig{ + Username: authInfo.Username, + Password: authInfo.Password, + } + _, pullErr = c.criImageClientV1alpha2.PullImage(ctx, pullImageReq) + if pullErr == nil { + pipeW.CloseWithError(io.EOF) + return newImagePullStatusReader(pipeR), nil + } + klog.Warningf("Failed to pull image %v:%v with user %v, err %v", imageName, tag, authInfo.Username, pullErr) + pullErrs = append(pullErrs, pullErr) + + } + if len(pullErrs) > 0 { + err = utilerrors.NewAggregate(pullErrs) + } + } + } + + // Try the default secret + if c.accountManager != nil { + var authInfo *daemonutil.AuthInfo + var defaultErr error + authInfo, defaultErr = c.accountManager.GetAccountInfo(registry) + if defaultErr != nil { + klog.Warningf("Failed to get account for registry %v, err %v", registry, defaultErr) + // When the default account acquisition fails, try to pull anonymously + } else if authInfo != nil { + klog.V(5).Infof("Pull image %v:%v with user %v", imageName, tag, authInfo.Username) + pullImageReq.Auth = &runtimeapiv1alpha2.AuthConfig{ + Username: authInfo.Username, + Password: authInfo.Password, + } + _, err = c.criImageClientV1alpha2.PullImage(ctx, pullImageReq) + if err == nil { + pipeW.CloseWithError(io.EOF) + return newImagePullStatusReader(pipeR), nil + } + klog.Warningf("Failed to pull image %v:%v, err %v", imageName, tag, err) + return nil, err + } + } + + if err != nil { + return nil, err + } + + // Anonymous pull + _, err = c.criImageClientV1alpha2.PullImage(ctx, pullImageReq) + if err != nil { + return nil, errors.Wrapf(err, "Failed to pull image reference %q", fullImageName) + } + pipeW.CloseWithError(io.EOF) + return newImagePullStatusReader(pipeR), nil +} + +// ListImages implements ImageService.ListImages using V1alpha2 CRI client. +func (c *commonCRIImageService) listImagesV1alpha2(ctx context.Context) ([]ImageInfo, error) { + listImagesReq := &runtimeapiv1alpha2.ListImagesRequest{} + listImagesResp, err := c.criImageClientV1alpha2.ListImages(ctx, listImagesReq) + if err != nil { + return nil, err + } + collection := make([]ImageInfo, 0, len(listImagesResp.GetImages())) + for _, img := range listImagesResp.GetImages() { + collection = append(collection, ImageInfo{ + ID: img.GetId(), + RepoTags: img.GetRepoTags(), + RepoDigests: img.GetRepoDigests(), + Size: int64(img.GetSize_()), + }) + } + return collection, nil +} diff --git a/pkg/daemon/criruntime/imageruntime/helpers.go b/pkg/daemon/criruntime/imageruntime/helpers.go index cfc3804010..cd1e18dbc7 100644 --- a/pkg/daemon/criruntime/imageruntime/helpers.go +++ b/pkg/daemon/criruntime/imageruntime/helpers.go @@ -17,14 +17,21 @@ limitations under the License. package imageruntime import ( + "context" "encoding/json" "fmt" "io" + "time" dockermessage "github.com/docker/docker/pkg/jsonmessage" daemonutil "github.com/openkruise/kruise/pkg/daemon/util" "github.com/openkruise/kruise/pkg/util" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" v1 "k8s.io/api/core/v1" + runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1" + runtimeapiv1alpha2 "k8s.io/cri-api/pkg/apis/runtime/v1alpha2" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/credentialprovider" credentialprovidersecrets "k8s.io/kubernetes/pkg/credentialprovider/secrets" @@ -239,3 +246,23 @@ func (c ImageInfo) ContainsImage(name string, tag string) bool { } return false } + +func determineImageClientAPIVersion(conn *grpc.ClientConn) (runtimeapi.ImageServiceClient, runtimeapiv1alpha2.ImageServiceClient, error) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + klog.V(4).InfoS("Finding the CRI API image version") + imageClientV1 := runtimeapi.NewImageServiceClient(conn) + + _, err := imageClientV1.ImageFsInfo(ctx, &runtimeapi.ImageFsInfoRequest{}) + if err == nil { + klog.V(2).InfoS("Using CRI v1 image API") + return imageClientV1, nil, nil + + } else if status.Code(err) == codes.Unimplemented { + klog.V(2).InfoS("Falling back to CRI v1alpha2 image API (deprecated in k8s 1.24)") + return nil, runtimeapiv1alpha2.NewImageServiceClient(conn), nil + } + + return nil, nil, fmt.Errorf("unable to determine image API version: %w", err) +}