@@ -17,14 +17,99 @@ limitations under the License.
1717package server
1818
1919import (
20+ "bytes"
2021 "errors"
22+ "fmt"
23+ "io"
24+ "os/exec"
25+ "strings"
2126
27+ "github.com/containerd/containerd/api/services/tasks/v1"
28+ "github.com/containerd/containerd/api/types/task"
29+ "github.com/golang/glog"
2230 "golang.org/x/net/context"
23-
2431 "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
2532)
2633
2734// PortForward prepares a streaming endpoint to forward ports from a PodSandbox, and returns the address.
28- func (c * criContainerdService ) PortForward (ctx context.Context , r * runtime.PortForwardRequest ) (* runtime.PortForwardResponse , error ) {
29- return nil , errors .New ("not implemented" )
35+ func (c * criContainerdService ) PortForward (ctx context.Context , r * runtime.PortForwardRequest ) (retRes * runtime.PortForwardResponse , retErr error ) {
36+ glog .V (2 ).Infof ("Portforward for sandbox %q port %v" , r .GetPodSandboxId (), r .GetPort ())
37+ defer func () {
38+ if retErr == nil {
39+ glog .V (2 ).Infof ("Portforward for %q returns URL %q" , r .GetPodSandboxId (), retRes .GetUrl ())
40+ }
41+ }()
42+
43+ sandbox , err := c .sandboxStore .Get (r .GetPodSandboxId ())
44+ if err != nil {
45+ return nil , fmt .Errorf ("unable to find sandbox: %v" , err )
46+ }
47+ id := sandbox .ID
48+
49+ info , err := c .taskService .Get (ctx , & tasks.GetTaskRequest {ContainerID : id })
50+ if err != nil && ! isContainerdGRPCNotFoundError (err ) {
51+ return nil , fmt .Errorf ("unable to get sandbox container info: %v" , err )
52+ }
53+
54+ if info .Task .Status != task .StatusRunning {
55+ return nil , errors .New ("sandbox container is not running" )
56+ }
57+ // TODO(random-liu): Verify that ports are exposed.
58+ return c .streamServer .GetPortForward (r )
59+ }
60+
61+ // portForward requires `nsenter` and `socat` on the node.
62+ func (c * criContainerdService ) portForward (id string , port int32 , stream io.ReadWriteCloser ) error {
63+ s , err := c .sandboxStore .Get (id )
64+ if err != nil {
65+ return fmt .Errorf ("unable to find sandbox in store: %v" , err )
66+ }
67+ pid := s .Pid
68+
69+ socat , err := exec .LookPath ("socat" )
70+ if err != nil {
71+ return fmt .Errorf ("unable to find socat: %v" , err )
72+ }
73+
74+ args := []string {"-t" , fmt .Sprintf ("%d" , pid ), "-n" , socat ,
75+ "-" , fmt .Sprintf ("TCP4:localhost:%d" , port )}
76+
77+ nsenter , err := exec .LookPath ("nsenter" )
78+ if err != nil {
79+ return fmt .Errorf ("unable to find nsenter: %v" , err )
80+ }
81+
82+ glog .V (2 ).Infof ("executing port forwarding command: %s %s" , nsenter , strings .Join (args , " " ))
83+
84+ cmd := exec .Command (nsenter , args ... )
85+ cmd .Stdout = stream
86+
87+ stderr := new (bytes.Buffer )
88+ cmd .Stderr = stderr
89+
90+ // If we use Stdin, command.Run() won't return until the goroutine that's copying
91+ // from stream finishes. Unfortunately, if you have a client like telnet connected
92+ // via port forwarding, as long as the user's telnet client is connected to the user's
93+ // local listener that port forwarding sets up, the telnet session never exits. This
94+ // means that even if socat has finished running, command.Run() won't ever return
95+ // (because the client still has the connection and stream open).
96+ //
97+ // The work around is to use StdinPipe(), as Wait() (called by Run()) closes the pipe
98+ // when the command (socat) exits.
99+ in , err := cmd .StdinPipe ()
100+ if err != nil {
101+ return fmt .Errorf ("unable to create stdin pipe: %v" , err )
102+ }
103+ go func () {
104+ if _ , err := io .Copy (in , stream ); err != nil {
105+ glog .Errorf ("Failed to copy port forward stdin for %q: %v" , id , err )
106+ }
107+ in .Close ()
108+ }()
109+
110+ if err := cmd .Run (); err != nil {
111+ return fmt .Errorf ("nsenter command returns error: %v, stderr: %q" , err , stderr .String ())
112+ }
113+
114+ return nil
30115}
0 commit comments