@@ -17,14 +17,109 @@ limitations under the License.
1717package server
1818
1919import (
20+ "bytes"
2021 "errors"
22+ "fmt"
23+ "io"
24+ "os/exec"
25+ "strings"
2126
27+ "github.com/containerd/containerd"
28+ "github.com/golang/glog"
2229 "golang.org/x/net/context"
23-
2430 "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
2531)
2632
2733// PortForward prepares a streaming endpoint to forward ports from a PodSandbox, and returns the address.
28- func (c * criContainerdService ) PortForward (ctx context.Context , r * runtime.PortForwardRequest ) (* runtime.PortForwardResponse , error ) {
29- return nil , errors .New ("not implemented" )
34+ func (c * criContainerdService ) PortForward (ctx context.Context , r * runtime.PortForwardRequest ) (retRes * runtime.PortForwardResponse , retErr error ) {
35+ // TODO(random-liu): Run a socat container inside the sandbox to do portforward.
36+ glog .V (2 ).Infof ("Portforward for sandbox %q port %v" , r .GetPodSandboxId (), r .GetPort ())
37+ defer func () {
38+ if retErr == nil {
39+ glog .V (2 ).Infof ("Portforward for %q returns URL %q" , r .GetPodSandboxId (), retRes .GetUrl ())
40+ }
41+ }()
42+
43+ sandbox , err := c .sandboxStore .Get (r .GetPodSandboxId ())
44+ if err != nil {
45+ return nil , fmt .Errorf ("failed to find sandbox: %v" , err )
46+ }
47+
48+ t , err := sandbox .Container .Task (ctx , nil )
49+ if err != nil {
50+ return nil , fmt .Errorf ("failed to get sandbox container: %v" , err )
51+ }
52+ status , err := t .Status (ctx )
53+ if err != nil {
54+ return nil , fmt .Errorf ("failed to get sandbox container status: %v" , err )
55+ }
56+ if status .Status != containerd .Running {
57+ return nil , errors .New ("sandbox container is not running" )
58+ }
59+ // TODO(random-liu): Verify that ports are exposed.
60+ return c .streamServer .GetPortForward (r )
61+ }
62+
63+ // portForward requires `nsenter` and `socat` on the node, it uses `nsenter` to enter the
64+ // sandbox namespace, and run `socat` inside the namespace to forward stream for a specific
65+ // port. The `socat` command keeps running until it exits or client disconnect.
66+ func (c * criContainerdService ) portForward (id string , port int32 , stream io.ReadWriteCloser ) error {
67+ s , err := c .sandboxStore .Get (id )
68+ if err != nil {
69+ return fmt .Errorf ("failed to find sandbox in store: %v" , err )
70+ }
71+ pid := s .Pid
72+
73+ socat , err := exec .LookPath ("socat" )
74+ if err != nil {
75+ return fmt .Errorf ("failed to find socat: %v" , err )
76+ }
77+
78+ // Check following links for meaning of the options:
79+ // * socat: https://linux.die.net/man/1/socat
80+ // * nsenter: http://man7.org/linux/man-pages/man1/nsenter.1.html
81+ args := []string {"-t" , fmt .Sprintf ("%d" , pid ), "-n" , socat ,
82+ "-" , fmt .Sprintf ("TCP4:localhost:%d" , port )}
83+
84+ nsenter , err := exec .LookPath ("nsenter" )
85+ if err != nil {
86+ return fmt .Errorf ("failed to find nsenter: %v" , err )
87+ }
88+
89+ glog .V (2 ).Infof ("Executing port forwarding command: %s %s" , nsenter , strings .Join (args , " " ))
90+
91+ cmd := exec .Command (nsenter , args ... )
92+ cmd .Stdout = stream
93+
94+ stderr := new (bytes.Buffer )
95+ cmd .Stderr = stderr
96+
97+ // If we use Stdin, command.Run() won't return until the goroutine that's copying
98+ // from stream finishes. Unfortunately, if you have a client like telnet connected
99+ // via port forwarding, as long as the user's telnet client is connected to the user's
100+ // local listener that port forwarding sets up, the telnet session never exits. This
101+ // means that even if socat has finished running, command.Run() won't ever return
102+ // (because the client still has the connection and stream open).
103+ //
104+ // The work around is to use StdinPipe(), as Wait() (called by Run()) closes the pipe
105+ // when the command (socat) exits.
106+ in , err := cmd .StdinPipe ()
107+ if err != nil {
108+ return fmt .Errorf ("failed to create stdin pipe: %v" , err )
109+ }
110+ go func () {
111+ if _ , err := io .Copy (in , stream ); err != nil {
112+ glog .Errorf ("Failed to copy port forward input for %q port %d: %v" , id , port , err )
113+ }
114+ in .Close ()
115+ glog .V (4 ).Infof ("Finish copy port forward input for %q port %d: %v" , id , port )
116+ }()
117+
118+ if err := cmd .Run (); err != nil {
119+ return fmt .Errorf ("nsenter command returns error: %v, stderr: %q" , err , stderr .String ())
120+ }
121+
122+ glog .V (2 ).Infof ("Finish port forwarding for %q port %d" , id , port )
123+
124+ return nil
30125}
0 commit comments