@@ -17,14 +17,107 @@ limitations under the License.
1717package server
1818
1919import (
20+ "bytes"
2021 "errors"
22+ "fmt"
23+ "io"
24+ "os/exec"
25+ "strings"
2126
27+ "github.com/containerd/containerd/api/services/tasks/v1"
28+ "github.com/containerd/containerd/api/types/task"
29+ "github.com/golang/glog"
2230 "golang.org/x/net/context"
23-
2431 "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
2532)
2633
2734// PortForward prepares a streaming endpoint to forward ports from a PodSandbox, and returns the address.
28- func (c * criContainerdService ) PortForward (ctx context.Context , r * runtime.PortForwardRequest ) (* runtime.PortForwardResponse , error ) {
29- return nil , errors .New ("not implemented" )
35+ func (c * criContainerdService ) PortForward (ctx context.Context , r * runtime.PortForwardRequest ) (retRes * runtime.PortForwardResponse , retErr error ) {
36+ glog .V (2 ).Infof ("Portforward for sandbox %q port %v" , r .GetPodSandboxId (), r .GetPort ())
37+ defer func () {
38+ if retErr == nil {
39+ glog .V (2 ).Infof ("Portforward for %q returns URL %q" , r .GetPodSandboxId (), retRes .GetUrl ())
40+ }
41+ }()
42+
43+ sandbox , err := c .sandboxStore .Get (r .GetPodSandboxId ())
44+ if err != nil {
45+ return nil , fmt .Errorf ("unable to find sandbox: %v" , err )
46+ }
47+ id := sandbox .ID
48+
49+ info , err := c .taskService .Get (ctx , & tasks.GetTaskRequest {ContainerID : id })
50+ if err != nil && ! isContainerdGRPCNotFoundError (err ) {
51+ return nil , fmt .Errorf ("unable to get sandbox container info: %v" , err )
52+ }
53+
54+ if info .Task .Status != task .StatusRunning {
55+ return nil , errors .New ("sandbox container is not running" )
56+ }
57+ // TODO(random-liu): Verify that ports are exposed.
58+ return c .streamServer .GetPortForward (r )
59+ }
60+
61+ // portForward requires `nsenter` and `socat` on the node, it uses `nsenter` to enter the
62+ // sandbox namespace, and run `socat` inside the namespace to forward stream for a specific
63+ // port. The `socat` command keeps running until it exits or client disconnect.
64+ func (c * criContainerdService ) portForward (id string , port int32 , stream io.ReadWriteCloser ) error {
65+ s , err := c .sandboxStore .Get (id )
66+ if err != nil {
67+ return fmt .Errorf ("unable to find sandbox in store: %v" , err )
68+ }
69+ pid := s .Pid
70+
71+ socat , err := exec .LookPath ("socat" )
72+ if err != nil {
73+ return fmt .Errorf ("unable to find socat: %v" , err )
74+ }
75+
76+ // Check following links for meaning of the options:
77+ // * socat: https://linux.die.net/man/1/socat
78+ // * nsenter: http://man7.org/linux/man-pages/man1/nsenter.1.html
79+ args := []string {"-t" , fmt .Sprintf ("%d" , pid ), "-n" , socat ,
80+ "-" , fmt .Sprintf ("TCP4:localhost:%d" , port )}
81+
82+ nsenter , err := exec .LookPath ("nsenter" )
83+ if err != nil {
84+ return fmt .Errorf ("unable to find nsenter: %v" , err )
85+ }
86+
87+ glog .V (2 ).Infof ("Executing port forwarding command: %s %s" , nsenter , strings .Join (args , " " ))
88+
89+ cmd := exec .Command (nsenter , args ... )
90+ cmd .Stdout = stream
91+
92+ stderr := new (bytes.Buffer )
93+ cmd .Stderr = stderr
94+
95+ // If we use Stdin, command.Run() won't return until the goroutine that's copying
96+ // from stream finishes. Unfortunately, if you have a client like telnet connected
97+ // via port forwarding, as long as the user's telnet client is connected to the user's
98+ // local listener that port forwarding sets up, the telnet session never exits. This
99+ // means that even if socat has finished running, command.Run() won't ever return
100+ // (because the client still has the connection and stream open).
101+ //
102+ // The work around is to use StdinPipe(), as Wait() (called by Run()) closes the pipe
103+ // when the command (socat) exits.
104+ in , err := cmd .StdinPipe ()
105+ if err != nil {
106+ return fmt .Errorf ("unable to create stdin pipe: %v" , err )
107+ }
108+ go func () {
109+ if _ , err := io .Copy (in , stream ); err != nil {
110+ glog .Errorf ("Failed to copy port forward input for %q port %d: %v" , id , port , err )
111+ }
112+ in .Close ()
113+ glog .V (4 ).Infof ("Finish copy port forward input for %q port %d: %v" , id , port )
114+ }()
115+
116+ if err := cmd .Run (); err != nil {
117+ return fmt .Errorf ("nsenter command returns error: %v, stderr: %q" , err , stderr .String ())
118+ }
119+
120+ glog .V (2 ).Infof ("Finish port forwarding for %q port %d" , id , port )
121+
122+ return nil
30123}
0 commit comments