Skip to content
This repository was archived by the owner on Mar 9, 2022. It is now read-only.

Commit 6c84a88

Browse files
committed
Add max_container_log_size
Signed-off-by: Lantao Liu <[email protected]>
1 parent 40007b3 commit 6c84a88

File tree

10 files changed

+382
-78
lines changed

10 files changed

+382
-78
lines changed

docs/config.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,11 @@ The explanation and default value of each configuration item are as follows:
2727
# enable_tls_streaming enables the TLS streaming support.
2828
enable_tls_streaming = false
2929

30+
# max_container_log_line_size is the maximum log line size in bytes for a container.
31+
# Log line longer than the limit will be split into multiple lines. -1 means no
32+
# limit.
33+
max_container_log_line_size = 16384
34+
3035
# "plugins.cri.containerd" contains config related to containerd
3136
[plugins.cri.containerd]
3237

hack/verify-lint.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ set -o pipefail
2020
for d in $(find . -type d -a \( -iwholename './pkg*' -o -iwholename './cmd*' \) -not -iwholename './pkg/api*'); do
2121
echo for directory ${d} ...
2222
gometalinter \
23-
--exclude='error return value not checked.*(Close|Log|Print).*\(errcheck\)$' \
23+
--exclude='error return value not checked.*(Close|Log|Print|Fprint).*\(errcheck\)$' \
2424
--exclude='.*_test\.go:.*error return value not checked.*\(errcheck\)$' \
2525
--exclude='duplicate of.*_test.go.*\(dupl\)$' \
2626
--exclude='.*/mock_.*\.go:.*\(golint\)$' \

pkg/config/config.go

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,10 @@ type PluginConfig struct {
9696
SystemdCgroup bool `toml:"systemd_cgroup" json:"systemdCgroup"`
9797
// EnableTLSStreaming indicates to enable the TLS streaming support.
9898
EnableTLSStreaming bool `toml:"enable_tls_streaming" json:"enableTLSStreaming"`
99+
// MaxContainerLogLineSize is the maximum log line size in bytes for a container.
100+
// Log line longer than the limit will be split into multiple lines. Non-positive
101+
// value means no limit.
102+
MaxContainerLogLineSize int `toml:"max_container_log_line_size" json:"maxContainerLogSize"`
99103
}
100104

101105
// Config contains all configurations for cri server.
@@ -129,13 +133,14 @@ func DefaultConfig() PluginConfig {
129133
Root: "",
130134
},
131135
},
132-
StreamServerAddress: "",
133-
StreamServerPort: "10010",
134-
EnableSelinux: false,
135-
EnableTLSStreaming: false,
136-
SandboxImage: "k8s.gcr.io/pause:3.1",
137-
StatsCollectPeriod: 10,
138-
SystemdCgroup: false,
136+
StreamServerAddress: "",
137+
StreamServerPort: "10010",
138+
EnableSelinux: false,
139+
EnableTLSStreaming: false,
140+
SandboxImage: "k8s.gcr.io/pause:3.1",
141+
StatsCollectPeriod: 10,
142+
SystemdCgroup: false,
143+
MaxContainerLogLineSize: 16 * 1024,
139144
Registry: Registry{
140145
Mirrors: map[string]Mirror{
141146
"docker.io": {

pkg/ioutil/write_closer.go

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,10 @@ limitations under the License.
1616

1717
package ioutil
1818

19-
import "io"
19+
import (
20+
"io"
21+
"sync"
22+
)
2023

2124
// writeCloseInformer wraps passed in write closer with a close channel.
2225
// Caller could wait on the close channel for the write closer to be
@@ -66,3 +69,34 @@ func (n *nopWriteCloser) Write(p []byte) (int, error) {
6669
func (n *nopWriteCloser) Close() error {
6770
return nil
6871
}
72+
73+
// serialWriteCloser wraps a write closer and makes sure all writes
74+
// are done in serial.
75+
// Parallel write won't intersect with each other. Use case:
76+
// 1) Pipe: Write content longer than PIPE_BUF.
77+
// See http://man7.org/linux/man-pages/man7/pipe.7.html
78+
// 2) <3.14 Linux Kernel: write is not atomic
79+
// See http://man7.org/linux/man-pages/man2/write.2.html
80+
type serialWriteCloser struct {
81+
mu sync.Mutex
82+
wc io.WriteCloser
83+
}
84+
85+
// NewSerialWriteCloser creates a SerialWriteCloser from a write closer.
86+
func NewSerialWriteCloser(wc io.WriteCloser) io.WriteCloser {
87+
return &serialWriteCloser{wc: wc}
88+
}
89+
90+
// Write writes a group of byte arrays in order atomically.
91+
func (s *serialWriteCloser) Write(data []byte) (int, error) {
92+
s.mu.Lock()
93+
defer s.mu.Unlock()
94+
return s.wc.Write(data)
95+
}
96+
97+
// Close closes the write closer.
98+
func (s *serialWriteCloser) Close() error {
99+
s.mu.Lock()
100+
defer s.mu.Unlock()
101+
return s.wc.Close()
102+
}

pkg/ioutil/write_closer_test.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,16 @@ limitations under the License.
1717
package ioutil
1818

1919
import (
20+
"io/ioutil"
21+
"os"
22+
"sort"
23+
"strconv"
24+
"strings"
25+
"sync"
2026
"testing"
2127

2228
"github.com/stretchr/testify/assert"
29+
"github.com/stretchr/testify/require"
2330
)
2431

2532
func TestWriteCloseInformer(t *testing.T) {
@@ -47,3 +54,55 @@ func TestWriteCloseInformer(t *testing.T) {
4754
assert.Fail(t, "write closer not closed")
4855
}
4956
}
57+
58+
func TestSerialWriteCloser(t *testing.T) {
59+
const (
60+
// Test 10 times to make sure it always pass.
61+
testCount = 10
62+
63+
goroutine = 10
64+
dataLen = 100000
65+
)
66+
for n := 0; n < testCount; n++ {
67+
testData := make([][]byte, goroutine)
68+
for i := 0; i < goroutine; i++ {
69+
testData[i] = []byte(repeatNumber(i, dataLen) + "\n")
70+
}
71+
72+
f, err := ioutil.TempFile("/tmp", "serial-write-closer")
73+
require.NoError(t, err)
74+
defer os.RemoveAll(f.Name())
75+
defer f.Close()
76+
wc := NewSerialWriteCloser(f)
77+
defer wc.Close()
78+
79+
// Write data in parallel
80+
var wg sync.WaitGroup
81+
wg.Add(goroutine)
82+
for i := 0; i < goroutine; i++ {
83+
go func(id int) {
84+
n, err := wc.Write(testData[id])
85+
assert.NoError(t, err)
86+
assert.Equal(t, dataLen+1, n)
87+
wg.Done()
88+
}(i)
89+
}
90+
wg.Wait()
91+
wc.Close()
92+
93+
// Check test result
94+
content, err := ioutil.ReadFile(f.Name())
95+
require.NoError(t, err)
96+
resultData := strings.Split(strings.TrimSpace(string(content)), "\n")
97+
require.Len(t, resultData, goroutine)
98+
sort.Strings(resultData)
99+
for i := 0; i < goroutine; i++ {
100+
expected := repeatNumber(i, dataLen)
101+
assert.Equal(t, expected, resultData[i])
102+
}
103+
}
104+
}
105+
106+
func repeatNumber(num, count int) string {
107+
return strings.Repeat(strconv.Itoa(num), count)
108+
}

pkg/server/container_log_reopen.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ func (c *criService) ReopenContainerLog(ctx context.Context, r *runtime.ReopenCo
3636
}
3737

3838
// Create new container logger and replace the existing ones.
39-
stdoutWC, stderrWC, err := createContainerLoggers(container.LogPath, container.Config.GetTty())
39+
stdoutWC, stderrWC, err := c.createContainerLoggers(container.LogPath, container.Config.GetTty())
4040
if err != nil {
4141
return nil, err
4242
}

pkg/server/container_start.go

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package server
1818

1919
import (
2020
"io"
21+
"os"
2122
"time"
2223

2324
"github.com/containerd/containerd"
@@ -29,6 +30,7 @@ import (
2930
runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2"
3031

3132
ctrdutil "github.com/containerd/cri/pkg/containerd/util"
33+
cioutil "github.com/containerd/cri/pkg/ioutil"
3234
cio "github.com/containerd/cri/pkg/server/io"
3335
containerstore "github.com/containerd/cri/pkg/store/container"
3436
sandboxstore "github.com/containerd/cri/pkg/store/sandbox"
@@ -97,20 +99,10 @@ func (c *criService) startContainer(ctx context.Context,
9799
}
98100

99101
ioCreation := func(id string) (_ containerdio.IO, err error) {
100-
stdoutWC, stderrWC, err := createContainerLoggers(meta.LogPath, config.GetTty())
102+
stdoutWC, stderrWC, err := c.createContainerLoggers(meta.LogPath, config.GetTty())
101103
if err != nil {
102104
return nil, errors.Wrap(err, "failed to create container loggers")
103105
}
104-
defer func() {
105-
if err != nil {
106-
if stdoutWC != nil {
107-
stdoutWC.Close()
108-
}
109-
if stderrWC != nil {
110-
stderrWC.Close()
111-
}
112-
}
113-
}()
114106
cntr.IO.AddOutput("log", stdoutWC, stderrWC)
115107
cntr.IO.Pipe()
116108
return cntr.IO, nil
@@ -142,24 +134,36 @@ func (c *criService) startContainer(ctx context.Context,
142134
return nil
143135
}
144136

145-
// Create container loggers and return write closer for stdout and stderr.
146-
func createContainerLoggers(logPath string, tty bool) (stdout io.WriteCloser, stderr io.WriteCloser, err error) {
137+
// createContainerLoggers creates container loggers and return write closer for stdout and stderr.
138+
func (c *criService) createContainerLoggers(logPath string, tty bool) (stdout io.WriteCloser, stderr io.WriteCloser, err error) {
147139
if logPath != "" {
148140
// Only generate container log when log path is specified.
149-
if stdout, err = cio.NewCRILogger(logPath, cio.Stdout); err != nil {
150-
return nil, nil, errors.Wrap(err, "failed to start container stdout logger")
141+
f, err := os.OpenFile(logPath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0640)
142+
if err != nil {
143+
return nil, nil, errors.Wrap(err, "failed to create and open log file")
151144
}
152145
defer func() {
153146
if err != nil {
154-
stdout.Close()
147+
f.Close()
155148
}
156149
}()
150+
var stdoutCh, stderrCh <-chan struct{}
151+
wc := cioutil.NewSerialWriteCloser(f)
152+
stdout, stdoutCh = cio.NewCRILogger(logPath, wc, cio.Stdout, c.config.MaxContainerLogLineSize)
157153
// Only redirect stderr when there is no tty.
158154
if !tty {
159-
if stderr, err = cio.NewCRILogger(logPath, cio.Stderr); err != nil {
160-
return nil, nil, errors.Wrap(err, "failed to start container stderr logger")
161-
}
155+
stderr, stderrCh = cio.NewCRILogger(logPath, wc, cio.Stderr, c.config.MaxContainerLogLineSize)
162156
}
157+
go func() {
158+
if stdoutCh != nil {
159+
<-stdoutCh
160+
}
161+
if stderrCh != nil {
162+
<-stderrCh
163+
}
164+
logrus.Debugf("Finish redirecting log file %q, closing it", logPath)
165+
f.Close()
166+
}()
163167
} else {
164168
stdout = cio.NewDiscardLogger()
165169
stderr = cio.NewDiscardLogger()

0 commit comments

Comments
 (0)