Skip to content
This repository was archived by the owner on Mar 9, 2022. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
9 changes: 8 additions & 1 deletion cmd/cri-containerd/cri_containerd.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,14 @@ func main() {
}

glog.V(2).Infof("Run cri-containerd grpc server on socket %q", o.SocketPath)
service, err := server.NewCRIContainerdService(o.ContainerdEndpoint, o.RootDir, o.NetworkPluginBinDir, o.NetworkPluginConfDir)
service, err := server.NewCRIContainerdService(
o.ContainerdEndpoint,
o.RootDir,
o.NetworkPluginBinDir,
o.NetworkPluginConfDir,
o.StreamServerAddress,
o.StreamServerPort,
)
if err != nil {
glog.Exitf("Failed to create CRI containerd service %+v: %v", o, err)
}
Expand Down
8 changes: 8 additions & 0 deletions cmd/cri-containerd/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ type CRIContainerdOptions struct {
NetworkPluginBinDir string
// NetworkPluginConfDir is the directory in which the admin places a CNI conf.
NetworkPluginConfDir string
// StreamServerAddress is the ip address streaming server is listening on.
StreamServerAddress string
// StreamServerPort is the port streaming server is listening on.
StreamServerPort string
}

// NewCRIContainerdOptions returns a reference to CRIContainerdOptions
Expand All @@ -63,6 +67,10 @@ func (c *CRIContainerdOptions) AddFlags(fs *pflag.FlagSet) {
"/etc/cni/net.d", "The directory for putting network binaries.")
fs.StringVar(&c.NetworkPluginConfDir, "network-conf-dir",
"/opt/cni/bin", "The directory for putting network plugin configuration files.")
fs.StringVar(&c.StreamServerAddress, "stream-addr",
"", "The ip address streaming server is listening on. Default host interface is used if this is empty.")
fs.StringVar(&c.StreamServerPort, "stream-port",
"10010", "The port streaming server is listening on.")
}

// InitFlags must be called after adding all cli options flags are defined and
Expand Down
2 changes: 1 addition & 1 deletion hack/test-cri.sh
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ ROOT="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"/..
# FOCUS focuses the test to run.
FOCUS=${FOCUS:-}
# SKIP skips the test to skip.
SKIP=${SKIP:-"Streaming|RunAsUser|host port"}
SKIP=${SKIP:-"attach|portforward|RunAsUser|host port"}
REPORT_DIR=${REPORT_DIR:-"/tmp"}

if [[ -z "${GOPATH}" ]]; then
Expand Down
5 changes: 5 additions & 0 deletions hack/update-vendor.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,10 @@ set -o pipefail
# TODO(random-liu): Remove this after #106 is resolved.
ROOT="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"/..
cd ${ROOT}
echo "Replace invalid imports..."
find vendor/ -name *.go | xargs sed -i 's/"github.com\/Sirupsen\/logrus"/"github.com\/sirupsen\/logrus"/g'

echo "Sort vendor.conf..."
sort vendor.conf -o vendor.conf

echo "Please commit the change made by this file..."
24 changes: 20 additions & 4 deletions pkg/server/container_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,30 @@ limitations under the License.
package server

import (
"errors"
"fmt"

"github.com/golang/glog"
"golang.org/x/net/context"

"k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
)

// Exec prepares a streaming endpoint to execute a command in the container, and returns the address.
func (c *criContainerdService) Exec(ctx context.Context, r *runtime.ExecRequest) (*runtime.ExecResponse, error) {
return nil, errors.New("not implemented")
func (c *criContainerdService) Exec(ctx context.Context, r *runtime.ExecRequest) (retRes *runtime.ExecResponse, retErr error) {
glog.V(2).Infof("Exec for %q with command %+v, tty %v and stdin %v",
r.GetContainerId(), r.GetCmd(), r.GetTty(), r.GetStdin())
defer func() {
if retErr == nil {
glog.V(2).Infof("Exec for %q returns URL %q", r.GetContainerId(), retRes.Url)
}
}()

cntr, err := c.containerStore.Get(r.GetContainerId())
if err != nil {
return nil, fmt.Errorf("failed to find container in store: %v", err)
}
state := cntr.Status.Get().State()
if state != runtime.ContainerState_CONTAINER_RUNNING {
return nil, fmt.Errorf("container is in %s state", criContainerStateToString(state))
}
return c.streamServer.GetExec(r)
}
79 changes: 61 additions & 18 deletions pkg/server/container_execsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@ package server
import (
"bytes"
"fmt"
"io"
"time"

"github.com/containerd/containerd"
"github.com/containerd/containerd/api/services/events/v1"
"github.com/containerd/containerd/typeurl"
"github.com/golang/glog"
"golang.org/x/net/context"
"k8s.io/client-go/tools/remotecommand"
"k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
)

Expand All @@ -40,12 +43,45 @@ func (c *criContainerdService) ExecSync(ctx context.Context, r *runtime.ExecSync
}
}()

var stdout, stderr bytes.Buffer
exitCode, err := c.execInContainer(ctx, r.GetContainerId(), execOptions{
cmd: r.GetCmd(),
stdout: &stdout,
stderr: &stderr,
timeout: time.Duration(r.GetTimeout()) * time.Second,
})
if err != nil {
return nil, fmt.Errorf("failed to exec in container: %v", err)
}

return &runtime.ExecSyncResponse{
Stdout: stdout.Bytes(),
Stderr: stderr.Bytes(),
ExitCode: int32(*exitCode),
}, nil
}

// execOptions specifies how to execute command in container.
type execOptions struct {
cmd []string
stdin io.Reader
stdout io.Writer
stderr io.Writer
tty bool
resize <-chan remotecommand.TerminalSize
timeout time.Duration
}

// execInContainer executes a command inside the container synchronously, and
// redirects stdio stream properly.
// TODO(random-liu): Support timeout.
func (c *criContainerdService) execInContainer(ctx context.Context, id string, opts execOptions) (*uint32, error) {
// Get container from our container store.
cntr, err := c.containerStore.Get(r.GetContainerId())
cntr, err := c.containerStore.Get(id)
if err != nil {
return nil, fmt.Errorf("failed to find container in store: %v", err)
}
id := cntr.ID
id = cntr.ID

state := cntr.Status.Get().State()
if state != runtime.ContainerState_CONTAINER_RUNNING {
Expand All @@ -65,14 +101,21 @@ func (c *criContainerdService) ExecSync(ctx context.Context, r *runtime.ExecSync
if err != nil {
return nil, fmt.Errorf("failed to load task: %v", err)
}

pspec := spec.Process
pspec.Args = r.GetCmd()
pspec.Args = opts.cmd
pspec.Terminal = opts.tty

if opts.stdin == nil {
// Create empty buffer if stdin is nil.
opts.stdin = new(bytes.Buffer)
}
execID := generateID()
stdinBuf, stdoutBuf, stderrBuf := new(bytes.Buffer), new(bytes.Buffer), new(bytes.Buffer)
io := containerd.NewIOWithTerminal(stdinBuf, stdoutBuf, stderrBuf, pspec.Terminal)
process, err := task.Exec(ctx, execID, pspec, io)
process, err := task.Exec(ctx, execID, pspec, containerd.NewIOWithTerminal(
opts.stdin,
opts.stdout,
opts.stderr,
opts.tty,
))
if err != nil {
return nil, fmt.Errorf("failed to create exec %q: %v", execID, err)
}
Expand All @@ -82,6 +125,12 @@ func (c *criContainerdService) ExecSync(ctx context.Context, r *runtime.ExecSync
}
}()

handleResizing(opts.resize, func(size remotecommand.TerminalSize) {
if err := process.Resize(ctx, uint32(size.Width), uint32(size.Height)); err != nil {
glog.Errorf("Failed to resize process %q console for container %q: %v", execID, id, err)
}
})

// Get containerd event client first, so that we won't miss any events.
// TODO(random-liu): Add filter to only subscribe events of the exec process.
// TODO(random-liu): Use `Wait` after is fixed. (containerd#1279, containerd#1287)
Expand All @@ -100,38 +149,32 @@ func (c *criContainerdService) ExecSync(ctx context.Context, r *runtime.ExecSync
if err != nil {
return nil, fmt.Errorf("failed to wait for exec in container %q to finish: %v", id, err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

always throw out the exit code if there is an error? unknownExitCode?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should let waitContainerExec also return pointer, so that we don't need the unknownExitStatus for now.

We don't use the exit code when there is an error now.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right.. just trying to figure out if that's on purpose. Seems at least possible to have an exit code and an error. Not a fan of throwing it out unless we need to.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}
// TODO(random-liu): [P1] Deal with timeout, kill and wait again on timeout.

// Wait for the io to be drained.
process.IO().Wait()

return &runtime.ExecSyncResponse{
Stdout: stdoutBuf.Bytes(),
Stderr: stderrBuf.Bytes(),
ExitCode: int32(exitCode),
}, nil
return exitCode, nil
}

// waitContainerExec waits for container exec to finish and returns the exit code.
func (c *criContainerdService) waitContainerExec(eventstream events.Events_SubscribeClient, id string,
execID string) (uint32, error) {
execID string) (*uint32, error) {
for {
evt, err := eventstream.Recv()
if err != nil {
// Return non-zero exit code just in case.
return unknownExitCode, err
return nil, err
}
// Continue until the event received is of type task exit.
if !typeurl.Is(evt.Event, &events.TaskExit{}) {
continue
}
any, err := typeurl.UnmarshalAny(evt.Event)
if err != nil {
return unknownExitCode, err
return nil, err
}
e := any.(*events.TaskExit)
if e.ContainerID == id && e.ID == execID {
return e.ExitStatus, nil
return &e.ExitStatus, nil
}
}
}
2 changes: 0 additions & 2 deletions pkg/server/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,6 @@ const (
errorExitReason = "Error"
// oomExitReason is the exit reason when process in container is oom killed.
oomExitReason = "OOMKilled"
// unknownExitCode is the exit code when exit reason is unknown.
unknownExitCode = 255
)

const (
Expand Down
25 changes: 21 additions & 4 deletions pkg/server/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@ import (
"github.com/containerd/containerd/images"
diffservice "github.com/containerd/containerd/services/diff"
"github.com/containerd/containerd/snapshot"
"github.com/golang/glog"
"github.com/kubernetes-incubator/cri-o/pkg/ocicni"
healthapi "google.golang.org/grpc/health/grpc_health_v1"
"k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
"k8s.io/kubernetes/pkg/kubelet/server/streaming"

osinterface "github.com/kubernetes-incubator/cri-containerd/pkg/os"
"github.com/kubernetes-incubator/cri-containerd/pkg/registrar"
Expand Down Expand Up @@ -84,6 +86,8 @@ type criContainerdService struct {
// imageStoreService is the containerd service to store and track
// image metadata.
imageStoreService images.Store
// eventsService is the containerd task service client
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just curious why this one is better up here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just group containerd services together. No particular reason. :)

eventService events.EventsClient
// versionService is the containerd version service client.
versionService versionapi.VersionClient
// healthService is the healthcheck service of containerd grpc server.
Expand All @@ -94,12 +98,14 @@ type criContainerdService struct {
agentFactory agents.AgentFactory
// client is an instance of the containerd client
client *containerd.Client
// eventsService is the containerd task service client
eventService events.EventsClient
// streamServer is the streaming server serves container streaming request.
streamServer streaming.Server
}

// NewCRIContainerdService returns a new instance of CRIContainerdService
func NewCRIContainerdService(containerdEndpoint, rootDir, networkPluginBinDir, networkPluginConfDir string) (CRIContainerdService, error) {
// TODO(random-liu): Add cri-containerd server config to get rid of the long arg list.
func NewCRIContainerdService(containerdEndpoint, rootDir, networkPluginBinDir, networkPluginConfDir,
streamAddress, streamPort string) (CRIContainerdService, error) {
// TODO(random-liu): [P2] Recover from runtime state and checkpoint.

client, err := containerd.New(containerdEndpoint, containerd.WithDefaultNamespace(k8sContainerdNamespace))
Expand All @@ -119,6 +125,7 @@ func NewCRIContainerdService(containerdEndpoint, rootDir, networkPluginBinDir, n
containerService: client.ContainerService(),
taskService: client.TaskService(),
imageStoreService: client.ImageService(),
eventService: client.EventService(),
contentStoreService: client.ContentStore(),
// Use daemon default snapshotter.
snapshotService: client.SnapshotService(""),
Expand All @@ -127,7 +134,6 @@ func NewCRIContainerdService(containerdEndpoint, rootDir, networkPluginBinDir, n
healthService: client.HealthService(),
agentFactory: agents.NewAgentFactory(),
client: client,
eventService: client.EventService(),
}

netPlugin, err := ocicni.InitCNI(networkPluginBinDir, networkPluginConfDir)
Expand All @@ -136,9 +142,20 @@ func NewCRIContainerdService(containerdEndpoint, rootDir, networkPluginBinDir, n
}
c.netPlugin = netPlugin

// prepare streaming server
c.streamServer, err = newStreamServer(c, streamAddress, streamPort)
if err != nil {
return nil, fmt.Errorf("failed to create stream server: %v", err)
}

return c, nil
}

func (c *criContainerdService) Start() {
c.startEventMonitor()
go func() {
if err := c.streamServer.Start(true); err != nil {
glog.Errorf("Failed to start streaming server: %v", err)
}
}()
}
Loading