Skip to content
This repository was archived by the owner on Mar 9, 2022. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions pkg/server/agents/agents.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
Copyright 2017 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package agents

import "io"

// StreamType is the type of the stream, stdout/stderr.
type StreamType string

const (
// Stdout stream type.
Stdout StreamType = "stdout"
// Stderr stream type.
Stderr StreamType = "stderr"
)

// Agent is the a running agent perform a specific task, e.g. redirect and
// decorate log, redirect stream etc.
type Agent interface {
// Start starts the logger.
Start() error
}

// AgentFactory is the factory to create required agents.
type AgentFactory interface {
// NewSandboxLogger creates a sandbox logging agent.
NewSandboxLogger(io.ReadCloser) Agent
// NewContainerLogger creates a container logging agent.
NewContainerLogger(string, StreamType, io.ReadCloser) Agent
}

type agentFactory struct{}

// NewAgentFactory creates a new agent factory.
func NewAgentFactory() AgentFactory {
return &agentFactory{}
}
116 changes: 116 additions & 0 deletions pkg/server/agents/logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
Copyright 2017 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package agents

import (
"bufio"
"bytes"
"fmt"
"io"
"io/ioutil"
"os"
"time"

"github.com/golang/glog"
)

const (
// delimiter used in CRI logging format.
delimiter = ' '
// eof is end-of-line.
eol = '\n'
// timestampFormat is the timestamp format used in CRI logging format.
timestampFormat = time.RFC3339Nano
// pipeBufSize is the system PIPE_BUF size, on linux it is 4096 bytes.
// POSIX.1 says that write less than PIPE_BUF is atmoic.
pipeBufSize = 4096
// bufSize is the size of the read buffer.
bufSize = pipeBufSize - len(timestampFormat) - len(Stdout) - 2 /*2 delimiter*/ - 1 /*eol*/
)

// sandboxLogger is the log agent used for sandbox.
// It discards sandbox all output for now.
type sandboxLogger struct {
rc io.ReadCloser
}

func (*agentFactory) NewSandboxLogger(rc io.ReadCloser) Agent {
return &sandboxLogger{rc: rc}
}

func (s *sandboxLogger) Start() error {
go func() {
// Discard the output for now.
io.Copy(ioutil.Discard, s.rc) // nolint: errcheck
s.rc.Close()
}()
return nil
}

// containerLogger is the log agent used for container.
// It redirect container log into CRI log file, and decorate the log
// line into CRI defined format.
type containerLogger struct {
path string
stream StreamType
rc io.ReadCloser
}

func (*agentFactory) NewContainerLogger(path string, stream StreamType, rc io.ReadCloser) Agent {
return &containerLogger{
path: path,
stream: stream,
rc: rc,
}
}

func (c *containerLogger) Start() error {
glog.V(4).Infof("Start reading log file %q", c.path)
wc, err := os.OpenFile(c.path, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0640)
if err != nil {
return fmt.Errorf("failed to open log file %q: %v", c.path, err)
}
go c.redirectLogs(wc)
return nil
}

func (c *containerLogger) redirectLogs(wc io.WriteCloser) {
defer c.rc.Close()
defer wc.Close()
streamBytes := []byte(c.stream)
delimiterBytes := []byte{delimiter}
r := bufio.NewReaderSize(c.rc, bufSize)
for {
// TODO(random-liu): Better define CRI log format, and escape newline in log.
lineBytes, _, err := r.ReadLine()
if err == io.EOF {
glog.V(4).Infof("Finish redirecting log file %q", c.path)
return
}
if err != nil {
glog.Errorf("An error occurred when redirecting log file %q: %v", c.path, err)
return
}
timestampBytes := time.Now().AppendFormat(nil, time.RFC3339Nano)
data := bytes.Join([][]byte{timestampBytes, streamBytes, lineBytes}, delimiterBytes)
data = append(data, eol)
if _, err := wc.Write(data); err != nil {
glog.Errorf("Fail to write log line %q: %v", data, err)
}
// Continue on write error to drain the input.
}
}
90 changes: 90 additions & 0 deletions pkg/server/agents/logger_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
Copyright 2017 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package agents

import (
"bytes"
"io/ioutil"
"strings"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

// writeCloserBuffer is a writecloser wrapper for bytes.Buffer
// with a nop closer.
type writeCloserBuffer struct {
*bytes.Buffer
}

// nop close
func (*writeCloserBuffer) Close() error { return nil }

func TestRedirectLogs(t *testing.T) {
f := NewAgentFactory()
// f.NewContainerLogger(
for desc, test := range map[string]struct {
input string
stream StreamType
content []string
}{
"stdout log": {
input: "test stdout log 1\ntest stdout log 2",
stream: Stdout,
content: []string{
"test stdout log 1",
"test stdout log 2",
},
},
"stderr log": {
input: "test stderr log 1\ntest stderr log 2",
stream: Stderr,
content: []string{
"test stderr log 1",
"test stderr log 2",
},
},
"long log": {
input: strings.Repeat("a", bufSize+10) + "\n",
stream: Stdout,
content: []string{
strings.Repeat("a", bufSize),
strings.Repeat("a", 10),
},
},
} {
t.Logf("TestCase %q", desc)
rc := ioutil.NopCloser(strings.NewReader(test.input))
c := f.NewContainerLogger("test-path", test.stream, rc).(*containerLogger)
wc := &writeCloserBuffer{bytes.NewBuffer(nil)}
c.redirectLogs(wc)
output := wc.String()
lines := strings.Split(output, "\n")
lines = lines[:len(lines)-1] // Discard empty string after last \n
assert.Len(t, lines, len(test.content))
for i := range lines {
fields := strings.SplitN(lines[i], string([]byte{delimiter}), 3)
require.Len(t, fields, 3)
_, err := time.Parse(timestampFormat, fields[0])
assert.NoError(t, err)
assert.EqualValues(t, test.stream, fields[1])
assert.Equal(t, test.content[i], fields[2])
}
}
}
50 changes: 50 additions & 0 deletions pkg/server/agents/testing/fake_agent_factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
Copyright 2017 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package testing

import (
"io"

"github.com/kubernetes-incubator/cri-containerd/pkg/server/agents"
)

// FakeAgent is a fake agent for test.
type FakeAgent struct{}

// Start always return nil not.
// TODO(random-liu): Inject error and test error.
func (f *FakeAgent) Start() error {
return nil
}

// FakeAgentFactory is a fake agent factory for test.
type FakeAgentFactory struct{}

// NewFakeAgentFactory creates fake agent factory.
func NewFakeAgentFactory() agents.AgentFactory {
return &FakeAgentFactory{}
}

// NewSandboxLogger creates a fake agent as sandbox logger.
func (*FakeAgentFactory) NewSandboxLogger(rc io.ReadCloser) agents.Agent {
return &FakeAgent{}
}

// NewContainerLogger creates a fake agent as container logger.
func (*FakeAgentFactory) NewContainerLogger(string, agents.StreamType, io.ReadCloser) agents.Agent {
return &FakeAgent{}
}
24 changes: 14 additions & 10 deletions pkg/server/container_start.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"io"
"os"
"path/filepath"
"time"

"github.com/containerd/containerd/api/services/execution"
Expand All @@ -35,6 +36,7 @@ import (
runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1"

"github.com/kubernetes-incubator/cri-containerd/pkg/metadata"
"github.com/kubernetes-incubator/cri-containerd/pkg/server/agents"
)

// StartContainer starts the container.
Expand Down Expand Up @@ -155,16 +157,18 @@ func (c *criContainerdService) startContainer(ctx context.Context, id string, me
w.Close()
}(stdinPipe)
}
go func(r io.ReadCloser) {
io.Copy(os.Stdout, r) // nolint: errcheck
r.Close()
}(stdoutPipe)
// Only redirect stderr when there is no tty.
if !config.GetTty() {
go func(r io.ReadCloser) {
io.Copy(os.Stderr, r) // nolint: errcheck
r.Close()
}(stderrPipe)
if config.GetLogPath() != "" {
// Only generate container log when log path is specified.
logPath := filepath.Join(sandboxConfig.GetLogDirectory(), config.GetLogPath())
if err := c.agentFactory.NewContainerLogger(logPath, agents.Stdout, stdoutPipe).Start(); err != nil {
return fmt.Errorf("failed to start container stdout logger: %v", err)
}
// Only redirect stderr when there is no tty.
if !config.GetTty() {
if err := c.agentFactory.NewContainerLogger(logPath, agents.Stderr, stderrPipe).Start(); err != nil {
return fmt.Errorf("failed to start container stderr logger: %v", err)
}
}
}

// Get rootfs mounts.
Expand Down
Loading