Skip to content
This repository was archived by the owner on Mar 9, 2022. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ The explanation and default value of each configuration item are as follows:
# enable_tls_streaming enables the TLS streaming support.
enable_tls_streaming = false

# max_container_log_line_size is the maximum log line size in bytes for a container.
# Log line longer than the limit will be split into multiple lines. -1 means no
# limit.
max_container_log_line_size = 16384

# "plugins.cri.containerd" contains config related to containerd
[plugins.cri.containerd]

Expand Down
2 changes: 1 addition & 1 deletion hack/verify-lint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ set -o pipefail
for d in $(find . -type d -a \( -iwholename './pkg*' -o -iwholename './cmd*' \) -not -iwholename './pkg/api*'); do
echo for directory ${d} ...
gometalinter \
--exclude='error return value not checked.*(Close|Log|Print).*\(errcheck\)$' \
--exclude='error return value not checked.*(Close|Log|Print|Fprint).*\(errcheck\)$' \
--exclude='.*_test\.go:.*error return value not checked.*\(errcheck\)$' \
--exclude='duplicate of.*_test.go.*\(dupl\)$' \
--exclude='.*/mock_.*\.go:.*\(golint\)$' \
Expand Down
113 changes: 113 additions & 0 deletions integration/container_log_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
Copyright 2018 The containerd Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package integration

import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
"strings"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2"
)

func TestLongContainerLog(t *testing.T) {
testPodLogDir, err := ioutil.TempDir("/tmp", "long-container-log")
require.NoError(t, err)
defer os.RemoveAll(testPodLogDir)

t.Log("Create a sandbox with log directory")
sbConfig := PodSandboxConfig("sandbox", "long-container-log",
WithPodLogDirectory(testPodLogDir),
)
sb, err := runtimeService.RunPodSandbox(sbConfig)
require.NoError(t, err)
defer func() {
assert.NoError(t, runtimeService.StopPodSandbox(sb))
assert.NoError(t, runtimeService.RemovePodSandbox(sb))
}()

const (
testImage = "busybox"
containerName = "test-container"
)
t.Logf("Pull test image %q", testImage)
img, err := imageService.PullImage(&runtime.ImageSpec{Image: testImage}, nil)
require.NoError(t, err)
defer func() {
assert.NoError(t, imageService.RemoveImage(&runtime.ImageSpec{Image: img}))
}()

t.Log("Create a container with log path")
config, err := CRIConfig()
require.NoError(t, err)
maxSize := config.MaxContainerLogLineSize
shortLineCmd := fmt.Sprintf("i=0; while [ $i -lt %d ]; do printf %s; i=$((i+1)); done", maxSize-1, "a")
maxLenLineCmd := fmt.Sprintf("i=0; while [ $i -lt %d ]; do printf %s; i=$((i+1)); done", maxSize, "b")
longLineCmd := fmt.Sprintf("i=0; while [ $i -lt %d ]; do printf %s; i=$((i+1)); done", maxSize+1, "c")
cnConfig := ContainerConfig(
containerName,
"busybox",
WithCommand("sh", "-c",
fmt.Sprintf("%s; echo; %s; echo; %s", shortLineCmd, maxLenLineCmd, longLineCmd)),
WithLogPath(containerName),
)
cn, err := runtimeService.CreateContainer(sb, cnConfig, sbConfig)
require.NoError(t, err)

t.Log("Start the container")
require.NoError(t, runtimeService.StartContainer(cn))

t.Log("Wait for container to finish running")
require.NoError(t, Eventually(func() (bool, error) {
s, err := runtimeService.ContainerStatus(cn)
if err != nil {
return false, err
}
if s.GetState() == runtime.ContainerState_CONTAINER_EXITED {
return true, nil
}
return false, nil
}, time.Second, 30*time.Second))

t.Log("Check container log")
content, err := ioutil.ReadFile(filepath.Join(testPodLogDir, containerName))
assert.NoError(t, err)
checkContainerLog(t, string(content), []string{
fmt.Sprintf("%s %s %s", runtime.Stdout, runtime.LogTagFull, strings.Repeat("a", maxSize-1)),
fmt.Sprintf("%s %s %s", runtime.Stdout, runtime.LogTagFull, strings.Repeat("b", maxSize)),
fmt.Sprintf("%s %s %s", runtime.Stdout, runtime.LogTagPartial, strings.Repeat("c", maxSize)),
fmt.Sprintf("%s %s %s", runtime.Stdout, runtime.LogTagFull, "c"),
})
}

func checkContainerLog(t *testing.T, log string, messages []string) {
lines := strings.Split(strings.TrimSpace(log), "\n")
require.Len(t, lines, len(messages), "log line number should match")
for i, line := range lines {
parts := strings.SplitN(line, " ", 2)
require.Len(t, parts, 2)
_, err := time.Parse(time.RFC3339Nano, parts[0])
assert.NoError(t, err, "timestamp should be in RFC3339Nano format")
assert.Equal(t, messages[i], parts[1], "log content should match")
}
}
20 changes: 10 additions & 10 deletions integration/container_update_resources_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func checkMemoryLimit(t *testing.T, spec *runtimespec.Spec, memLimit int64) {
}

func TestUpdateContainerResources(t *testing.T) {
t.Logf("Create a sandbox")
t.Log("Create a sandbox")
sbConfig := PodSandboxConfig("sandbox", "update-container-resources")
sb, err := runtimeService.RunPodSandbox(sbConfig)
require.NoError(t, err)
Expand All @@ -46,7 +46,7 @@ func TestUpdateContainerResources(t *testing.T) {
assert.NoError(t, runtimeService.RemovePodSandbox(sb))
}()

t.Logf("Create a container with memory limit")
t.Log("Create a container with memory limit")
cnConfig := ContainerConfig(
"container",
pauseImage,
Expand All @@ -57,48 +57,48 @@ func TestUpdateContainerResources(t *testing.T) {
cn, err := runtimeService.CreateContainer(sb, cnConfig, sbConfig)
require.NoError(t, err)

t.Logf("Check memory limit in container OCI spec")
t.Log("Check memory limit in container OCI spec")
container, err := containerdClient.LoadContainer(context.Background(), cn)
require.NoError(t, err)
spec, err := container.Spec(context.Background())
require.NoError(t, err)
checkMemoryLimit(t, spec, 2*1024*1024)

t.Logf("Update container memory limit after created")
t.Log("Update container memory limit after created")
err = runtimeService.UpdateContainerResources(cn, &runtime.LinuxContainerResources{
MemoryLimitInBytes: 4 * 1024 * 1024,
})
require.NoError(t, err)

t.Logf("Check memory limit in container OCI spec")
t.Log("Check memory limit in container OCI spec")
spec, err = container.Spec(context.Background())
require.NoError(t, err)
checkMemoryLimit(t, spec, 4*1024*1024)

t.Logf("Start the container")
t.Log("Start the container")
require.NoError(t, runtimeService.StartContainer(cn))
task, err := container.Task(context.Background(), nil)
require.NoError(t, err)

t.Logf("Check memory limit in cgroup")
t.Log("Check memory limit in cgroup")
cgroup, err := cgroups.Load(cgroups.V1, cgroups.PidPath(int(task.Pid())))
require.NoError(t, err)
stat, err := cgroup.Stat(cgroups.IgnoreNotExist)
require.NoError(t, err)
assert.Equal(t, uint64(4*1024*1024), stat.Memory.Usage.Limit)

t.Logf("Update container memory limit after started")
t.Log("Update container memory limit after started")
err = runtimeService.UpdateContainerResources(cn, &runtime.LinuxContainerResources{
MemoryLimitInBytes: 8 * 1024 * 1024,
})
require.NoError(t, err)

t.Logf("Check memory limit in container OCI spec")
t.Log("Check memory limit in container OCI spec")
spec, err = container.Spec(context.Background())
require.NoError(t, err)
checkMemoryLimit(t, spec, 8*1024*1024)

t.Logf("Check memory limit in cgroup")
t.Log("Check memory limit in cgroup")
stat, err = cgroup.Stat(cgroups.IgnoreNotExist)
require.NoError(t, err)
assert.Equal(t, uint64(8*1024*1024), stat.Memory.Usage.Limit)
Expand Down
84 changes: 64 additions & 20 deletions integration/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package integration

import (
"context"
"encoding/json"
"flag"
"fmt"
"os/exec"
Expand All @@ -27,12 +29,15 @@ import (
"github.com/containerd/containerd"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
"k8s.io/kubernetes/pkg/kubelet/apis/cri"
runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2"
"k8s.io/kubernetes/pkg/kubelet/remote"
kubeletutil "k8s.io/kubernetes/pkg/kubelet/util"

api "github.com/containerd/cri/pkg/api/v1"
"github.com/containerd/cri/pkg/client"
criconfig "github.com/containerd/cri/pkg/config"
"github.com/containerd/cri/pkg/constants"
"github.com/containerd/cri/pkg/util"
)
Expand Down Expand Up @@ -97,6 +102,7 @@ func ConnectDaemons() error {
// Opts sets specific information in pod sandbox config.
type PodSandboxOpts func(*runtime.PodSandboxConfig)

// Set host network.
func WithHostNetwork(p *runtime.PodSandboxConfig) {
if p.Linux == nil {
p.Linux = &runtime.LinuxPodSandboxConfig{}
Expand All @@ -111,6 +117,13 @@ func WithHostNetwork(p *runtime.PodSandboxConfig) {
}
}

// Add pod log directory.
func WithPodLogDirectory(dir string) PodSandboxOpts {
return func(p *runtime.PodSandboxConfig) {
p.LogDirectory = dir
}
}

// PodSandboxConfig generates a pod sandbox config for test.
func PodSandboxConfig(name, ns string, opts ...PodSandboxOpts) *runtime.PodSandboxConfig {
config := &runtime.PodSandboxConfig{
Expand All @@ -134,52 +147,59 @@ func PodSandboxConfig(name, ns string, opts ...PodSandboxOpts) *runtime.PodSandb
type ContainerOpts func(*runtime.ContainerConfig)

func WithTestLabels() ContainerOpts {
return func(cf *runtime.ContainerConfig) {
cf.Labels = map[string]string{"key": "value"}
return func(c *runtime.ContainerConfig) {
c.Labels = map[string]string{"key": "value"}
}
}

func WithTestAnnotations() ContainerOpts {
return func(cf *runtime.ContainerConfig) {
cf.Annotations = map[string]string{"a.b.c": "test"}
return func(c *runtime.ContainerConfig) {
c.Annotations = map[string]string{"a.b.c": "test"}
}
}

// Add container resource limits.
func WithResources(r *runtime.LinuxContainerResources) ContainerOpts {
return func(cf *runtime.ContainerConfig) {
if cf.Linux == nil {
cf.Linux = &runtime.LinuxContainerConfig{}
return func(c *runtime.ContainerConfig) {
if c.Linux == nil {
c.Linux = &runtime.LinuxContainerConfig{}
}
cf.Linux.Resources = r
c.Linux.Resources = r
}
}

// Add container command.
func WithCommand(c string, args ...string) ContainerOpts {
return func(cf *runtime.ContainerConfig) {
cf.Command = []string{c}
cf.Args = args
func WithCommand(cmd string, args ...string) ContainerOpts {
return func(c *runtime.ContainerConfig) {
c.Command = []string{cmd}
c.Args = args
}
}

// Add pid namespace mode.
func WithPidNamespace(mode runtime.NamespaceMode) ContainerOpts {
return func(cf *runtime.ContainerConfig) {
if cf.Linux == nil {
cf.Linux = &runtime.LinuxContainerConfig{}
return func(c *runtime.ContainerConfig) {
if c.Linux == nil {
c.Linux = &runtime.LinuxContainerConfig{}
}
if cf.Linux.SecurityContext == nil {
cf.Linux.SecurityContext = &runtime.LinuxContainerSecurityContext{}
if c.Linux.SecurityContext == nil {
c.Linux.SecurityContext = &runtime.LinuxContainerSecurityContext{}
}
if cf.Linux.SecurityContext.NamespaceOptions == nil {
cf.Linux.SecurityContext.NamespaceOptions = &runtime.NamespaceOption{}
if c.Linux.SecurityContext.NamespaceOptions == nil {
c.Linux.SecurityContext.NamespaceOptions = &runtime.NamespaceOption{}
}
cf.Linux.SecurityContext.NamespaceOptions.Pid = mode
c.Linux.SecurityContext.NamespaceOptions.Pid = mode
}

}

// Add container log path.
func WithLogPath(path string) ContainerOpts {
return func(c *runtime.ContainerConfig) {
c.LogPath = path
}
}

// ContainerConfig creates a container config given a name and image name
// and additional container config options
func ContainerConfig(name, image string, opts ...ContainerOpts) *runtime.ContainerConfig {
Expand Down Expand Up @@ -244,3 +264,27 @@ func PidOf(name string) (int, error) {
}
return strconv.Atoi(output)
}

// CRIConfig gets current cri config from containerd.
func CRIConfig() (*criconfig.Config, error) {
addr, dialer, err := kubeletutil.GetAddressAndDialer(*criEndpoint)
if err != nil {
return nil, errors.Wrap(err, "failed to get dialer")
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
conn, err := grpc.DialContext(ctx, addr, grpc.WithInsecure(), grpc.WithDialer(dialer))
if err != nil {
return nil, errors.Wrap(err, "failed to connect cri endpoint")
}
client := runtime.NewRuntimeServiceClient(conn)
resp, err := client.Status(ctx, &runtime.StatusRequest{Verbose: true})
if err != nil {
return nil, errors.Wrap(err, "failed to get status")
}
config := &criconfig.Config{}
if err := json.Unmarshal([]byte(resp.Info["config"]), config); err != nil {
return nil, errors.Wrap(err, "failed to unmarshal config")
}
return config, nil
}
19 changes: 12 additions & 7 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ type PluginConfig struct {
SystemdCgroup bool `toml:"systemd_cgroup" json:"systemdCgroup"`
// EnableTLSStreaming indicates to enable the TLS streaming support.
EnableTLSStreaming bool `toml:"enable_tls_streaming" json:"enableTLSStreaming"`
// MaxContainerLogLineSize is the maximum log line size in bytes for a container.
// Log line longer than the limit will be split into multiple lines. Non-positive
// value means no limit.
MaxContainerLogLineSize int `toml:"max_container_log_line_size" json:"maxContainerLogSize"`
}

// Config contains all configurations for cri server.
Expand Down Expand Up @@ -129,13 +133,14 @@ func DefaultConfig() PluginConfig {
Root: "",
},
},
StreamServerAddress: "",
StreamServerPort: "10010",
EnableSelinux: false,
EnableTLSStreaming: false,
SandboxImage: "k8s.gcr.io/pause:3.1",
StatsCollectPeriod: 10,
SystemdCgroup: false,
StreamServerAddress: "",
StreamServerPort: "10010",
EnableSelinux: false,
EnableTLSStreaming: false,
SandboxImage: "k8s.gcr.io/pause:3.1",
StatsCollectPeriod: 10,
SystemdCgroup: false,
MaxContainerLogLineSize: 16 * 1024,
Registry: Registry{
Mirrors: map[string]Mirror{
"docker.io": {
Expand Down
Loading