@@ -17,15 +17,136 @@ limitations under the License.
1717package server
1818
1919import (
20- "errors"
20+ "bytes"
21+ "encoding/json"
22+ "fmt"
23+ "io"
24+ "io/ioutil"
2125
26+ "github.com/containerd/containerd/api/services/execution"
27+ "github.com/containerd/containerd/api/types/task"
28+ prototypes "github.com/gogo/protobuf/types"
29+ "github.com/golang/glog"
30+ runtimespec "github.com/opencontainers/runtime-spec/specs-go"
2231 "golang.org/x/net/context"
23-
2432 runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1"
2533)
2634
2735// ExecSync executes a command in the container, and returns the stdout output.
2836// If command exits with a non-zero exit code, an error is returned.
29- func (c * criContainerdService ) ExecSync (ctx context.Context , r * runtime.ExecSyncRequest ) (* runtime.ExecSyncResponse , error ) {
30- return nil , errors .New ("not implemented" )
37+ func (c * criContainerdService ) ExecSync (ctx context.Context , r * runtime.ExecSyncRequest ) (retRes * runtime.ExecSyncResponse , retErr error ) {
38+ glog .V (2 ).Infof ("ExecSync for %q with command %+v and timeout %d (s)" , r .GetContainerId (), r .GetCmd (), r .GetTimeout ())
39+ defer func () {
40+ if retErr == nil {
41+ glog .V (2 ).Infof ("ExecSync for %q returns with exit code %d" , r .GetContainerId (), retRes .GetExitCode ())
42+ glog .V (4 ).Infof ("ExecSync for %q outputs - stdout: %q, stderr: %q" , r .GetContainerId (),
43+ retRes .GetStdout (), retRes .GetStderr ())
44+ }
45+ }()
46+
47+ // Get container config from container store.
48+ meta , err := c .containerStore .Get (r .GetContainerId ())
49+ if err != nil {
50+ return nil , fmt .Errorf ("an error occurred when try to find container %q: %v" , r .GetContainerId (), err )
51+ }
52+ id := meta .ID
53+
54+ if meta .State () != runtime .ContainerState_CONTAINER_RUNNING {
55+ return nil , fmt .Errorf ("container %q is in %s state" , id , criContainerStateToString (meta .State ()))
56+ }
57+
58+ // TODO(random-liu): Replace the following logic with containerd client and add unit test.
59+ // Prepare streaming pipes.
60+ execDir , err := ioutil .TempDir (getContainerRootDir (c .rootDir , id ), "exec" )
61+ if err != nil {
62+ return nil , fmt .Errorf ("failed to create exec streaming directory: %v" , err )
63+ }
64+ defer func () {
65+ if err = c .os .RemoveAll (execDir ); err != nil {
66+ glog .Errorf ("Failed to remove exec streaming directory %q: %v" , execDir , err )
67+ }
68+ }()
69+ _ , stdout , stderr := getStreamingPipes (execDir )
70+ _ , stdoutPipe , stderrPipe , err := c .prepareStreamingPipes (ctx , "" , stdout , stderr )
71+ if err != nil {
72+ return nil , fmt .Errorf ("failed to prepare streaming pipes: %v" , err )
73+ }
74+ defer stdoutPipe .Close ()
75+ defer stderrPipe .Close ()
76+
77+ // Start redirecting exec output.
78+ stdoutBuf , stderrBuf := new (bytes.Buffer ), new (bytes.Buffer )
79+ go io .Copy (stdoutBuf , stdoutPipe ) // nolint: errcheck
80+ go io .Copy (stderrBuf , stderrPipe ) // nolint: errcheck
81+
82+ // Get containerd event client first, so that we won't miss any events.
83+ // TODO(random-liu): Handle this in event handler. Create an events client for
84+ // each exec introduces unnecessary overhead.
85+ cancellable , cancel := context .WithCancel (ctx )
86+ events , err := c .taskService .Events (cancellable , & execution.EventsRequest {})
87+ if err != nil {
88+ return nil , fmt .Errorf ("failed to get containerd event: %v" , err )
89+ }
90+
91+ spec := & meta .Spec .Process
92+ spec .Args = r .GetCmd ()
93+ rawSpec , err := json .Marshal (spec )
94+ if err != nil {
95+ return nil , fmt .Errorf ("failed to marshal oci spec %+v: %v" , spec , err )
96+ }
97+
98+ resp , err := c .taskService .Exec (ctx , & execution.ExecRequest {
99+ ContainerID : id ,
100+ Terminal : false ,
101+ Stdout : stdout ,
102+ Stderr : stderr ,
103+ Spec : & prototypes.Any {
104+ TypeUrl : runtimespec .Version ,
105+ Value : rawSpec ,
106+ },
107+ })
108+ if err != nil {
109+ return nil , fmt .Errorf ("failed to exec in container %q: %v" , id , err )
110+ }
111+ exitCode , err := waitContainerExec (cancel , events , id , resp .Pid , r .GetTimeout ())
112+ if err != nil {
113+ return nil , fmt .Errorf ("failed to wait for exec in container %q to finish: %v" , id , err )
114+ }
115+
116+ // TODO(random-liu): Make sure stdout/stderr are drained.
117+ return & runtime.ExecSyncResponse {
118+ Stdout : stdoutBuf .Bytes (),
119+ Stderr : stderrBuf .Bytes (),
120+ ExitCode : int32 (exitCode ),
121+ }, nil
122+ }
123+
124+ // waitContainerExec waits for container exec to finish and returns the exit code.
125+ func waitContainerExec (cancel context.CancelFunc , events execution.Tasks_EventsClient , id string ,
126+ pid uint32 , timeout int64 ) (uint32 , error ) {
127+ // TODO(random-liu): [P1] Support ExecSync timeout.
128+ // TODO(random-liu): Delete process after containerd upgrade.
129+ defer func () {
130+ // Stop events and drain the event channel. grpc-go#188
131+ cancel ()
132+ for {
133+ _ , err := events .Recv ()
134+ if err != nil {
135+ break
136+ }
137+ }
138+ }()
139+ for {
140+ e , err := events .Recv ()
141+ if err != nil {
142+ // Return non-zero exit code just in case.
143+ return unknownExitCode , err
144+ }
145+ if e .Type != task .Event_EXIT {
146+ continue
147+ }
148+ if e .ID == id && e .Pid == pid {
149+ return e .ExitStatus , nil
150+ }
151+ }
31152}
0 commit comments