Skip to content
This repository was archived by the owner on Mar 9, 2022. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions pkg/metadata/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ package metadata
import (
"encoding/json"

"github.com/kubernetes-incubator/cri-containerd/pkg/metadata/store"

runtimespec "github.com/opencontainers/runtime-spec/specs-go"
runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1"

"github.com/kubernetes-incubator/cri-containerd/pkg/metadata/store"
)

// The code is very similar with sandbox.go, but there is no template support
Expand Down Expand Up @@ -71,8 +72,13 @@ type ContainerMetadata struct {
// In fact, this field doesn't need to be checkpointed.
// TODO(random-liu): Skip this during serialization when we put object
// into the store directly.
// TODO(random-liu): Reset this field to false during state recoverry.
// TODO(random-liu): Reset this field to false during state recovery.
Removing bool
// TODO(random-liu): Remove following field after switching to new containerd
// client.
// Not including them in unit test now because they will be removed soon.
// Spec is the oci runtime spec used to run the container.
Spec *runtimespec.Spec
}

// State returns current state of the container based on the metadata.
Expand Down
1 change: 1 addition & 0 deletions pkg/server/container_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ func (c *criContainerdService) CreateContainer(ctx context.Context, r *runtime.C

// Update container CreatedAt.
meta.CreatedAt = time.Now().UnixNano()
meta.Spec = spec
// Add container into container store.
if err := c.containerStore.Create(meta); err != nil {
return nil, fmt.Errorf("failed to add container metadata %+v into store: %v",
Expand Down
16 changes: 9 additions & 7 deletions pkg/server/container_create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -565,13 +565,6 @@ func TestCreateContainer(t *testing.T) {
id := resp.GetContainerId()
assert.True(t, rootExists)
assert.Equal(t, getContainerRootDir(c.rootDir, id), rootPath, "root directory should be created")
meta, err := c.containerStore.Get(id)
assert.NoError(t, err)
require.NotNil(t, meta)
test.expectMeta.ID = id
// TODO(random-liu): Use fake clock to test CreatedAt.
test.expectMeta.CreatedAt = meta.CreatedAt
assert.Equal(t, test.expectMeta, meta, "container metadata should be created")

// Check runtime spec
containersCalls := fake.GetCalledDetails()
Expand All @@ -593,5 +586,14 @@ func TestCreateContainer(t *testing.T) {
Key: id,
Parent: testChainID,
}, prepareOpts, "prepare request should be correct")

meta, err := c.containerStore.Get(id)
assert.NoError(t, err)
require.NotNil(t, meta)
test.expectMeta.ID = id
// TODO(random-liu): Use fake clock to test CreatedAt.
test.expectMeta.CreatedAt = meta.CreatedAt
test.expectMeta.Spec = spec
assert.Equal(t, test.expectMeta, meta, "container metadata should be created")
}
}
129 changes: 125 additions & 4 deletions pkg/server/container_execsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,136 @@ limitations under the License.
package server

import (
"errors"
"bytes"
"encoding/json"
"fmt"
"io"
"io/ioutil"

"github.com/containerd/containerd/api/services/execution"
"github.com/containerd/containerd/api/types/task"
prototypes "github.com/gogo/protobuf/types"
"github.com/golang/glog"
runtimespec "github.com/opencontainers/runtime-spec/specs-go"
"golang.org/x/net/context"

runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1"
)

// ExecSync executes a command in the container, and returns the stdout output.
// If command exits with a non-zero exit code, an error is returned.
func (c *criContainerdService) ExecSync(ctx context.Context, r *runtime.ExecSyncRequest) (*runtime.ExecSyncResponse, error) {
return nil, errors.New("not implemented")
func (c *criContainerdService) ExecSync(ctx context.Context, r *runtime.ExecSyncRequest) (retRes *runtime.ExecSyncResponse, retErr error) {
glog.V(2).Infof("ExecSync for %q with command %+v and timeout %d (s)", r.GetContainerId(), r.GetCmd(), r.GetTimeout())
defer func() {
if retErr == nil {
glog.V(2).Infof("ExecSync for %q returns with exit code %d", r.GetContainerId(), retRes.GetExitCode())
glog.V(4).Infof("ExecSync for %q outputs - stdout: %q, stderr: %q", r.GetContainerId(),
retRes.GetStdout(), retRes.GetStderr())
}
}()

// Get container config from container store.
meta, err := c.containerStore.Get(r.GetContainerId())
if err != nil {
return nil, fmt.Errorf("an error occurred when try to find container %q: %v", r.GetContainerId(), err)
}
id := meta.ID

if meta.State() != runtime.ContainerState_CONTAINER_RUNNING {
return nil, fmt.Errorf("container %q is in %s state", id, criContainerStateToString(meta.State()))
}

// TODO(random-liu): Replace the following logic with containerd client and add unit test.
// Prepare streaming pipes.
execDir, err := ioutil.TempDir(getContainerRootDir(c.rootDir, id), "exec")
if err != nil {
return nil, fmt.Errorf("failed to create exec streaming directory: %v", err)
}
defer func() {
if err = c.os.RemoveAll(execDir); err != nil {
glog.Errorf("Failed to remove exec streaming directory %q: %v", execDir, err)
}
}()
_, stdout, stderr := getStreamingPipes(execDir)
_, stdoutPipe, stderrPipe, err := c.prepareStreamingPipes(ctx, "", stdout, stderr)
if err != nil {
return nil, fmt.Errorf("failed to prepare streaming pipes: %v", err)
}
defer stdoutPipe.Close()
defer stderrPipe.Close()

// Start redirecting exec output.
stdoutBuf, stderrBuf := new(bytes.Buffer), new(bytes.Buffer)
go io.Copy(stdoutBuf, stdoutPipe) // nolint: errcheck
go io.Copy(stderrBuf, stderrPipe) // nolint: errcheck

// Get containerd event client first, so that we won't miss any events.
// TODO(random-liu): Handle this in event handler. Create an events client for
// each exec introduces unnecessary overhead.
cancellable, cancel := context.WithCancel(ctx)
events, err := c.taskService.Events(cancellable, &execution.EventsRequest{})
if err != nil {
return nil, fmt.Errorf("failed to get containerd event: %v", err)
}

spec := &meta.Spec.Process
spec.Args = r.GetCmd()
rawSpec, err := json.Marshal(spec)
if err != nil {
return nil, fmt.Errorf("failed to marshal oci spec %+v: %v", spec, err)
}

resp, err := c.taskService.Exec(ctx, &execution.ExecRequest{
ContainerID: id,
Terminal: false,
Stdout: stdout,
Stderr: stderr,
Spec: &prototypes.Any{
TypeUrl: runtimespec.Version,
Value: rawSpec,
},
})
if err != nil {
return nil, fmt.Errorf("failed to exec in container %q: %v", id, err)
}
exitCode, err := waitContainerExec(cancel, events, id, resp.Pid, r.GetTimeout())
if err != nil {
return nil, fmt.Errorf("failed to wait for exec in container %q to finish: %v", id, err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this return eats the unknownExitCode.. What's that for if we are going to nil it here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! We do have some special logic here in dockershim. Let me check.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If containerd dies during waiting, the exec process may still be running, so I think we should return error instead of 255 exit code without error.

I still keep the unknownExitCode, because:

  1. We may still need it when adding the recovery logic after daemon restart.
  2. Just in case we use the 0 exit code by mistake.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok.. will watch for the recovery logic code to see unknownExitCode in action then :-)

}

// TODO(random-liu): Make sure stdout/stderr are drained.
return &runtime.ExecSyncResponse{
Stdout: stdoutBuf.Bytes(),
Stderr: stderrBuf.Bytes(),
ExitCode: int32(exitCode),
}, nil
}

// waitContainerExec waits for container exec to finish and returns the exit code.
func waitContainerExec(cancel context.CancelFunc, events execution.Tasks_EventsClient, id string,
pid uint32, timeout int64) (uint32, error) {
// TODO(random-liu): [P1] Support ExecSync timeout.
// TODO(random-liu): Delete process after containerd upgrade.
defer func() {
// Stop events and drain the event channel. grpc-go#188
cancel()
for {
_, err := events.Recv()
if err != nil {
break
}
}
}()
for {
e, err := events.Recv()
if err != nil {
// Return non-zero exit code just in case.
return unknownExitCode, err
}
if e.Type != task.Event_EXIT {
continue
}
if e.ID == id && e.Pid == pid {
return e.ExitStatus, nil
}
}
}
2 changes: 2 additions & 0 deletions pkg/server/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ const (
completeExitReason = "Completed"
// errorExitReason is the exit reason when container exits with code non-zero.
errorExitReason = "Error"
// unknownExitCode is the exit code when exit reason is unknown.
unknownExitCode = 255
)

const (
Expand Down