Skip to content

Commit 71734f3

Browse files
authored
Merge pull request containerd#56 from Random-Liu/add-container-logging
Add container logging support.
2 parents e10cdbb + 17ca29a commit 71734f3

File tree

8 files changed

+338
-24
lines changed

8 files changed

+338
-24
lines changed

pkg/server/agents/agents.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
Copyright 2017 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package agents
18+
19+
import "io"
20+
21+
// StreamType is the type of the stream, stdout/stderr.
22+
type StreamType string
23+
24+
const (
25+
// Stdout stream type.
26+
Stdout StreamType = "stdout"
27+
// Stderr stream type.
28+
Stderr StreamType = "stderr"
29+
)
30+
31+
// Agent is the a running agent perform a specific task, e.g. redirect and
32+
// decorate log, redirect stream etc.
33+
type Agent interface {
34+
// Start starts the logger.
35+
Start() error
36+
}
37+
38+
// AgentFactory is the factory to create required agents.
39+
type AgentFactory interface {
40+
// NewSandboxLogger creates a sandbox logging agent.
41+
NewSandboxLogger(io.ReadCloser) Agent
42+
// NewContainerLogger creates a container logging agent.
43+
NewContainerLogger(string, StreamType, io.ReadCloser) Agent
44+
}
45+
46+
type agentFactory struct{}
47+
48+
// NewAgentFactory creates a new agent factory.
49+
func NewAgentFactory() AgentFactory {
50+
return &agentFactory{}
51+
}

pkg/server/agents/logger.go

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
/*
2+
Copyright 2017 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package agents
18+
19+
import (
20+
"bufio"
21+
"bytes"
22+
"fmt"
23+
"io"
24+
"io/ioutil"
25+
"os"
26+
"time"
27+
28+
"github.com/golang/glog"
29+
)
30+
31+
const (
32+
// delimiter used in CRI logging format.
33+
delimiter = ' '
34+
// eof is end-of-line.
35+
eol = '\n'
36+
// timestampFormat is the timestamp format used in CRI logging format.
37+
timestampFormat = time.RFC3339Nano
38+
// pipeBufSize is the system PIPE_BUF size, on linux it is 4096 bytes.
39+
// POSIX.1 says that write less than PIPE_BUF is atmoic.
40+
pipeBufSize = 4096
41+
// bufSize is the size of the read buffer.
42+
bufSize = pipeBufSize - len(timestampFormat) - len(Stdout) - 2 /*2 delimiter*/ - 1 /*eol*/
43+
)
44+
45+
// sandboxLogger is the log agent used for sandbox.
46+
// It discards sandbox all output for now.
47+
type sandboxLogger struct {
48+
rc io.ReadCloser
49+
}
50+
51+
func (*agentFactory) NewSandboxLogger(rc io.ReadCloser) Agent {
52+
return &sandboxLogger{rc: rc}
53+
}
54+
55+
func (s *sandboxLogger) Start() error {
56+
go func() {
57+
// Discard the output for now.
58+
io.Copy(ioutil.Discard, s.rc) // nolint: errcheck
59+
s.rc.Close()
60+
}()
61+
return nil
62+
}
63+
64+
// containerLogger is the log agent used for container.
65+
// It redirect container log into CRI log file, and decorate the log
66+
// line into CRI defined format.
67+
type containerLogger struct {
68+
path string
69+
stream StreamType
70+
rc io.ReadCloser
71+
}
72+
73+
func (*agentFactory) NewContainerLogger(path string, stream StreamType, rc io.ReadCloser) Agent {
74+
return &containerLogger{
75+
path: path,
76+
stream: stream,
77+
rc: rc,
78+
}
79+
}
80+
81+
func (c *containerLogger) Start() error {
82+
glog.V(4).Infof("Start reading log file %q", c.path)
83+
wc, err := os.OpenFile(c.path, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0640)
84+
if err != nil {
85+
return fmt.Errorf("failed to open log file %q: %v", c.path, err)
86+
}
87+
go c.redirectLogs(wc)
88+
return nil
89+
}
90+
91+
func (c *containerLogger) redirectLogs(wc io.WriteCloser) {
92+
defer c.rc.Close()
93+
defer wc.Close()
94+
streamBytes := []byte(c.stream)
95+
delimiterBytes := []byte{delimiter}
96+
r := bufio.NewReaderSize(c.rc, bufSize)
97+
for {
98+
// TODO(random-liu): Better define CRI log format, and escape newline in log.
99+
lineBytes, _, err := r.ReadLine()
100+
if err == io.EOF {
101+
glog.V(4).Infof("Finish redirecting log file %q", c.path)
102+
return
103+
}
104+
if err != nil {
105+
glog.Errorf("An error occurred when redirecting log file %q: %v", c.path, err)
106+
return
107+
}
108+
timestampBytes := time.Now().AppendFormat(nil, time.RFC3339Nano)
109+
data := bytes.Join([][]byte{timestampBytes, streamBytes, lineBytes}, delimiterBytes)
110+
data = append(data, eol)
111+
if _, err := wc.Write(data); err != nil {
112+
glog.Errorf("Fail to write log line %q: %v", data, err)
113+
}
114+
// Continue on write error to drain the input.
115+
}
116+
}

pkg/server/agents/logger_test.go

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
Copyright 2017 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package agents
18+
19+
import (
20+
"bytes"
21+
"io/ioutil"
22+
"strings"
23+
"testing"
24+
"time"
25+
26+
"github.com/stretchr/testify/assert"
27+
"github.com/stretchr/testify/require"
28+
)
29+
30+
// writeCloserBuffer is a writecloser wrapper for bytes.Buffer
31+
// with a nop closer.
32+
type writeCloserBuffer struct {
33+
*bytes.Buffer
34+
}
35+
36+
// nop close
37+
func (*writeCloserBuffer) Close() error { return nil }
38+
39+
func TestRedirectLogs(t *testing.T) {
40+
f := NewAgentFactory()
41+
// f.NewContainerLogger(
42+
for desc, test := range map[string]struct {
43+
input string
44+
stream StreamType
45+
content []string
46+
}{
47+
"stdout log": {
48+
input: "test stdout log 1\ntest stdout log 2",
49+
stream: Stdout,
50+
content: []string{
51+
"test stdout log 1",
52+
"test stdout log 2",
53+
},
54+
},
55+
"stderr log": {
56+
input: "test stderr log 1\ntest stderr log 2",
57+
stream: Stderr,
58+
content: []string{
59+
"test stderr log 1",
60+
"test stderr log 2",
61+
},
62+
},
63+
"long log": {
64+
input: strings.Repeat("a", bufSize+10) + "\n",
65+
stream: Stdout,
66+
content: []string{
67+
strings.Repeat("a", bufSize),
68+
strings.Repeat("a", 10),
69+
},
70+
},
71+
} {
72+
t.Logf("TestCase %q", desc)
73+
rc := ioutil.NopCloser(strings.NewReader(test.input))
74+
c := f.NewContainerLogger("test-path", test.stream, rc).(*containerLogger)
75+
wc := &writeCloserBuffer{bytes.NewBuffer(nil)}
76+
c.redirectLogs(wc)
77+
output := wc.String()
78+
lines := strings.Split(output, "\n")
79+
lines = lines[:len(lines)-1] // Discard empty string after last \n
80+
assert.Len(t, lines, len(test.content))
81+
for i := range lines {
82+
fields := strings.SplitN(lines[i], string([]byte{delimiter}), 3)
83+
require.Len(t, fields, 3)
84+
_, err := time.Parse(timestampFormat, fields[0])
85+
assert.NoError(t, err)
86+
assert.EqualValues(t, test.stream, fields[1])
87+
assert.Equal(t, test.content[i], fields[2])
88+
}
89+
}
90+
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
Copyright 2017 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package testing
18+
19+
import (
20+
"io"
21+
22+
"github.com/kubernetes-incubator/cri-containerd/pkg/server/agents"
23+
)
24+
25+
// FakeAgent is a fake agent for test.
26+
type FakeAgent struct{}
27+
28+
// Start always return nil not.
29+
// TODO(random-liu): Inject error and test error.
30+
func (f *FakeAgent) Start() error {
31+
return nil
32+
}
33+
34+
// FakeAgentFactory is a fake agent factory for test.
35+
type FakeAgentFactory struct{}
36+
37+
// NewFakeAgentFactory creates fake agent factory.
38+
func NewFakeAgentFactory() agents.AgentFactory {
39+
return &FakeAgentFactory{}
40+
}
41+
42+
// NewSandboxLogger creates a fake agent as sandbox logger.
43+
func (*FakeAgentFactory) NewSandboxLogger(rc io.ReadCloser) agents.Agent {
44+
return &FakeAgent{}
45+
}
46+
47+
// NewContainerLogger creates a fake agent as container logger.
48+
func (*FakeAgentFactory) NewContainerLogger(string, agents.StreamType, io.ReadCloser) agents.Agent {
49+
return &FakeAgent{}
50+
}

pkg/server/container_start.go

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"fmt"
2222
"io"
2323
"os"
24+
"path/filepath"
2425
"time"
2526

2627
"github.com/containerd/containerd/api/services/execution"
@@ -35,6 +36,7 @@ import (
3536
runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1"
3637

3738
"github.com/kubernetes-incubator/cri-containerd/pkg/metadata"
39+
"github.com/kubernetes-incubator/cri-containerd/pkg/server/agents"
3840
)
3941

4042
// StartContainer starts the container.
@@ -155,16 +157,18 @@ func (c *criContainerdService) startContainer(ctx context.Context, id string, me
155157
w.Close()
156158
}(stdinPipe)
157159
}
158-
go func(r io.ReadCloser) {
159-
io.Copy(os.Stdout, r) // nolint: errcheck
160-
r.Close()
161-
}(stdoutPipe)
162-
// Only redirect stderr when there is no tty.
163-
if !config.GetTty() {
164-
go func(r io.ReadCloser) {
165-
io.Copy(os.Stderr, r) // nolint: errcheck
166-
r.Close()
167-
}(stderrPipe)
160+
if config.GetLogPath() != "" {
161+
// Only generate container log when log path is specified.
162+
logPath := filepath.Join(sandboxConfig.GetLogDirectory(), config.GetLogPath())
163+
if err := c.agentFactory.NewContainerLogger(logPath, agents.Stdout, stdoutPipe).Start(); err != nil {
164+
return fmt.Errorf("failed to start container stdout logger: %v", err)
165+
}
166+
// Only redirect stderr when there is no tty.
167+
if !config.GetTty() {
168+
if err := c.agentFactory.NewContainerLogger(logPath, agents.Stderr, stderrPipe).Start(); err != nil {
169+
return fmt.Errorf("failed to start container stderr logger: %v", err)
170+
}
171+
}
168172
}
169173

170174
// Get rootfs mounts.

0 commit comments

Comments
 (0)