Skip to content
This repository was archived by the owner on Mar 9, 2022. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/server/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func (c *criContainerdService) startEventMonitor() error {
func (c *criContainerdService) handleEventStream(events execution.ContainerService_EventsClient) {
// TODO(random-liu): [P1] Should backoff on this error, or else this will
// cause a busy loop.
// TODO(random-liu): Handle io.EOF.
e, err := events.Recv()
if err != nil {
glog.Errorf("Failed to receive event: %v", err)
Expand Down
73 changes: 54 additions & 19 deletions pkg/server/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ const (
)

const (
// defaultSandboxImage is the image used by sandbox container.
// TODO(random-liu): [P1] Build schema 2 pause image and use it here.
defaultSandboxImage = "gcr.io/google.com/noogler-kubernetes/pause-amd64:3.0"
// relativeRootfsPath is the rootfs path relative to bundle path.
relativeRootfsPath = "rootfs"
// defaultRuntime is the runtime to use in containerd. We may support
Expand Down Expand Up @@ -305,31 +308,39 @@ func getRepoDigestAndTag(namedRef reference.Named, digest imagedigest.Digest) (s
return repoDigest, repoTag
}

// localResolve resolves image reference to image id locally. It returns empty string
// without error if the reference doesn't exist.
func (c *criContainerdService) localResolve(ctx context.Context, ref string) (string, error) {
// localResolve resolves image reference locally and returns corresponding image metadata. It returns
// nil without error if the reference doesn't exist.
func (c *criContainerdService) localResolve(ctx context.Context, ref string) (*metadata.ImageMetadata, error) {
_, err := imagedigest.Parse(ref)
if err == nil {
return ref, nil
}
// ref is not image id, try to resolve it locally.
normalized, err := normalizeImageRef(ref)
if err != nil {
return "", fmt.Errorf("invalid image reference %q: %v", ref, err)
}
image, err := c.imageStoreService.Get(ctx, normalized.String())
if err != nil {
if images.IsNotFound(err) {
return "", nil
// ref is not image id, try to resolve it locally.
normalized, err := normalizeImageRef(ref)
if err != nil {
return nil, fmt.Errorf("invalid image reference %q: %v", ref, err)
}
image, err := c.imageStoreService.Get(ctx, normalized.String())
if err != nil {
if images.IsNotFound(err) {
return nil, nil
}
return nil, fmt.Errorf("an error occurred when getting image %q from containerd image store: %v",
normalized.String(), err)
}
desc, err := image.Config(ctx, c.contentStoreService)
if err != nil {
return nil, fmt.Errorf("failed to get image config descriptor: %v", err)
}
return "", fmt.Errorf("an error occurred when getting image %q from containerd image store: %v",
normalized.String(), err)
ref = desc.Digest.String()
}
desc, err := image.Config(ctx, c.contentStoreService)
imageID := ref
meta, err := c.imageMetadataStore.Get(imageID)
if err != nil {
return "", fmt.Errorf("failed to get image config descriptor: %v", err)
if metadata.IsNotExistError(err) {
return nil, nil
}
return nil, fmt.Errorf("failed to get image %q metadata: %v", imageID, err)
}
return desc.Digest.String(), nil
return meta, nil
}

// getUserFromImage gets uid or user name of the image user.
Expand All @@ -350,3 +361,27 @@ func getUserFromImage(user string) (*int64, string) {
// If user is a numeric uid.
return &uid, ""
}

// ensureImageExists returns corresponding metadata of the image reference, if image is not
// pulled yet, the function will pull the image.
func (c *criContainerdService) ensureImageExists(ctx context.Context, ref string) (*metadata.ImageMetadata, error) {
meta, err := c.localResolve(ctx, ref)
if err != nil {
return nil, fmt.Errorf("failed to resolve image %q: %v", ref, err)
}
if meta != nil {
return meta, nil
}
// Pull image to ensure the image exists
resp, err := c.PullImage(ctx, &runtime.PullImageRequest{Image: &runtime.ImageSpec{Image: ref}})
if err != nil {
return nil, fmt.Errorf("failed to pull image %q: %v", ref, err)
}
imageID := resp.GetImageRef()
meta, err = c.imageMetadataStore.Get(imageID)
if err != nil {
// It's still possible that someone removed the image right after it is pulled.
return nil, fmt.Errorf("failed to get image %q metadata after pulling: %v", imageID, err)
}
return meta, nil
}
17 changes: 5 additions & 12 deletions pkg/server/image_remove.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,21 +41,14 @@ func (c *criContainerdService) RemoveImage(ctx context.Context, r *runtime.Remov
glog.V(2).Infof("RemoveImage %q returns successfully", r.GetImage().GetImage())
}
}()
imageID, err := c.localResolve(ctx, r.GetImage().GetImage())
meta, err := c.localResolve(ctx, r.GetImage().GetImage())
if err != nil {
return nil, fmt.Errorf("can not resolve %q locally: %v", r.GetImage().GetImage(), err)
}
if imageID == "" {
if meta == nil {
// return empty without error when image not found.
return &runtime.RemoveImageResponse{}, nil
}
meta, err := c.imageMetadataStore.Get(imageID)
if err != nil {
if metadata.IsNotExistError(err) {
return &runtime.RemoveImageResponse{}, nil
}
return nil, fmt.Errorf("an error occurred when get image %q metadata: %v", imageID, err)
}
// Also include repo digest, because if user pull image with digest,
// there will also be a corresponding repo digest reference.
for _, ref := range append(meta.RepoTags, meta.RepoDigests...) {
Expand All @@ -65,14 +58,14 @@ func (c *criContainerdService) RemoveImage(ctx context.Context, r *runtime.Remov
if err == nil || images.IsNotFound(err) {
continue
}
return nil, fmt.Errorf("failed to delete image reference %q for image %q: %v", ref, imageID, err)
return nil, fmt.Errorf("failed to delete image reference %q for image %q: %v", ref, meta.ID, err)
}
err = c.imageMetadataStore.Delete(imageID)
err = c.imageMetadataStore.Delete(meta.ID)
if err != nil {
if metadata.IsNotExistError(err) {
return &runtime.RemoveImageResponse{}, nil
}
return nil, fmt.Errorf("an error occurred when delete image %q matadata: %v", imageID, err)
return nil, fmt.Errorf("an error occurred when delete image %q matadata: %v", meta.ID, err)
}
return &runtime.RemoveImageResponse{}, nil
}
15 changes: 2 additions & 13 deletions pkg/server/image_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ import (
"github.com/golang/glog"
"golang.org/x/net/context"
runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1"

"github.com/kubernetes-incubator/cri-containerd/pkg/metadata"
)

// ImageStatus returns the status of the image, returns nil if the image isn't present.
Expand All @@ -37,23 +35,14 @@ func (c *criContainerdService) ImageStatus(ctx context.Context, r *runtime.Image
r.GetImage().GetImage(), retRes.GetImage())
}
}()
imageID, err := c.localResolve(ctx, r.GetImage().GetImage())
meta, err := c.localResolve(ctx, r.GetImage().GetImage())
if err != nil {
return nil, fmt.Errorf("can not resolve %q locally: %v", r.GetImage().GetImage(), err)
}
if imageID == "" {
if meta == nil {
// return empty without error when image not found.
return &runtime.ImageStatusResponse{}, nil
}

meta, err := c.imageMetadataStore.Get(imageID)
if err != nil {
if metadata.IsNotExistError(err) {
return &runtime.ImageStatusResponse{}, nil
}
return nil, fmt.Errorf("an error occurred during get image %q metadata: %v",
imageID, err)
}
// TODO(random-liu): [P0] Make sure corresponding snapshot exists. What if snapshot
// doesn't exist?
runtimeImage := &runtime.Image{
Expand Down
3 changes: 2 additions & 1 deletion pkg/server/sandbox_remove.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (c *criContainerdService) RemovePodSandbox(ctx context.Context, r *runtime.
glog.V(2).Infof("RemovePodSandbox for sandbox %q", r.GetPodSandboxId())
defer func() {
if retErr == nil {
glog.V(2).Info("RemovePodSandbox %q returns successfully", r.GetPodSandboxId())
glog.V(2).Infof("RemovePodSandbox %q returns successfully", r.GetPodSandboxId())
}
}()

Expand Down Expand Up @@ -65,6 +65,7 @@ func (c *criContainerdService) RemovePodSandbox(ctx context.Context, r *runtime.
return nil, fmt.Errorf("sandbox container %q is not fully stopped", id)
}

// TODO(random-liu): [P0] Cleanup snapshot after switching to new snapshot api.
// TODO(random-liu): [P0] Cleanup shm created in RunPodSandbox.
// TODO(random-liu): [P1] Remove permanent namespace once used.

Expand Down
76 changes: 49 additions & 27 deletions pkg/server/sandbox_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,18 @@ import (
"fmt"
"io"
"io/ioutil"
"strings"
"time"

"github.com/containerd/containerd/api/services/execution"
rootfsapi "github.com/containerd/containerd/api/services/rootfs"
prototypes "github.com/gogo/protobuf/types"
"github.com/golang/glog"
imagedigest "github.com/opencontainers/go-digest"
imagespec "github.com/opencontainers/image-spec/specs-go/v1"
runtimespec "github.com/opencontainers/runtime-spec/specs-go"
"github.com/opencontainers/runtime-tools/generate"
"golang.org/x/net/context"

"github.com/containerd/containerd/api/services/execution"
"github.com/containerd/containerd/api/types/mount"

runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1"

"github.com/kubernetes-incubator/cri-containerd/pkg/metadata"
Expand Down Expand Up @@ -81,10 +82,22 @@ func (c *criContainerdService) RunPodSandbox(ctx context.Context, r *runtime.Run
Config: config,
}

// TODO(random-liu): [P0] Ensure pause image snapshot, apply default image config
// and get snapshot mounts.
// Use fixed rootfs path and sleep command.
const rootPath = "/"
// Ensure sandbox container image snapshot.
imageMeta, err := c.ensureImageExists(ctx, c.sandboxImage)
if err != nil {
return nil, fmt.Errorf("failed to get sandbox image %q: %v", defaultSandboxImage, err)
}
prepareResp, err := c.rootfsService.Prepare(ctx, &rootfsapi.PrepareRequest{
Name: id,
// We are sure that ChainID must be a digest.
ChainID: imagedigest.Digest(imageMeta.ChainID),
Readonly: true,
})
if err != nil {
return nil, fmt.Errorf("failed to prepare sandbox rootfs %q: %v", imageMeta.ChainID, err)
}
// TODO(random-liu): [P0] Cleanup snapshot on failure after switching to new rootfs api.
rootfsMounts := prepareResp.Mounts

// Create sandbox container root directory.
// Prepare streaming named pipe.
Expand Down Expand Up @@ -124,7 +137,10 @@ func (c *criContainerdService) RunPodSandbox(ctx context.Context, r *runtime.Run
}

// Start sandbox container.
spec := c.generateSandboxContainerSpec(id, config)
spec, err := c.generateSandboxContainerSpec(id, config, imageMeta.Config)
if err != nil {
return nil, fmt.Errorf("failed to generate sandbox container spec: %v", err)
}
rawSpec, err := json.Marshal(spec)
if err != nil {
return nil, fmt.Errorf("failed to marshal oci spec %+v: %v", spec, err)
Expand All @@ -137,16 +153,7 @@ func (c *criContainerdService) RunPodSandbox(ctx context.Context, r *runtime.Run
Value: rawSpec,
},
// TODO(random-liu): [P0] Get rootfs mount from containerd.
Rootfs: []*mount.Mount{
{
Type: "bind",
Source: rootPath,
Options: []string{
"rw",
"rbind",
},
},
},
Rootfs: rootfsMounts,
Runtime: defaultRuntime,
// No stdin for sandbox container.
Stdout: stdout,
Expand Down Expand Up @@ -205,19 +212,34 @@ func (c *criContainerdService) RunPodSandbox(ctx context.Context, r *runtime.Run
return &runtime.RunPodSandboxResponse{PodSandboxId: id}, nil
}

func (c *criContainerdService) generateSandboxContainerSpec(id string, config *runtime.PodSandboxConfig) *runtimespec.Spec {
// TODO(random-liu): [P0] Get command from image config.
pauseCommand := []string{"sh", "-c", "while true; do sleep 1000000000; done"}

func (c *criContainerdService) generateSandboxContainerSpec(id string, config *runtime.PodSandboxConfig,
imageConfig *imagespec.ImageConfig) (*runtimespec.Spec, error) {
// Creates a spec Generator with the default spec.
// TODO(random-liu): [P1] Compare the default settings with docker and containerd default.
g := generate.New()

// Set relative root path.
g.SetRootPath(relativeRootfsPath)
// Apply default config from image config.
for _, e := range imageConfig.Env {
kv := strings.Split(e, "=")
if len(kv) != 2 {
return nil, fmt.Errorf("invalid environment variable in image config %+v", imageConfig)
}
g.AddProcessEnv(kv[0], kv[1])
}

if imageConfig.WorkingDir != "" {
g.SetProcessCwd(imageConfig.WorkingDir)
}

if len(imageConfig.Entrypoint) == 0 {
// Pause image must have entrypoint.
return nil, fmt.Errorf("invalid empty entrypoint in image config %+v", imageConfig)
}
// Set process commands.
g.SetProcessArgs(pauseCommand)
g.SetProcessArgs(append(imageConfig.Entrypoint, imageConfig.Cmd...))

// Set relative root path.
g.SetRootPath(relativeRootfsPath)

// Make root of sandbox container read-only.
g.SetRootReadonly(true)
Expand Down Expand Up @@ -276,5 +298,5 @@ func (c *criContainerdService) generateSandboxContainerSpec(id string, config *r

// TODO(random-liu): [P1] Set default sandbox container resource limit.

return g.Spec()
return g.Spec(), nil
}
Loading