Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ require (
github.com/tcnksm/go-input v0.0.0-20180404061846-548a7d7a8ee8
github.com/ugorji/go/codec v1.1.7
github.com/xlab/treeprint v0.0.0-20181112141820-a009c3971eca
gopkg.in/karalabe/cookiejar.v1 v1.0.0-20141109175019-e1490cae028c
k8s.io/api v0.0.0-20190620084959-7cf5895f2711
k8s.io/apimachinery v0.0.0-20190612205821-1799e75a0719
k8s.io/client-go v0.0.0-20190620085101-78d2af792bab
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/inf.v0 v0.9.0 h1:3zYtXIO92bvsdS3ggAdA8Gb4Azj0YU+TVY1uGYNFA8o=
gopkg.in/inf.v0 v0.9.0/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
gopkg.in/karalabe/cookiejar.v1 v1.0.0-20141109175019-e1490cae028c h1:4GYkPhjcYLPrPAnoxHVQlH/xcXtWN8pEgqBnHrPAs8c=
gopkg.in/karalabe/cookiejar.v1 v1.0.0-20141109175019-e1490cae028c/go.mod h1:xd7qpr5uPMNy4hsRJ5JEBXA8tJjTFmUI1soCjlCIgAE=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v2 v2.2.1 h1:mUhvW9EsL+naU5Q3cakzfE91YhliOondGd6ZrsDBHQE=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
Expand Down
3 changes: 1 addition & 2 deletions images/fluentd/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
FROM fluent/fluentd-kubernetes-daemonset:v1.7.1-debian-cloudwatch-1.0
RUN fluent-gem install fluent-plugin-grep
RUN fluent-gem install fluent-plugin-route
RUN fluent-gem install fluent-plugin-record-modifier --no-document
10 changes: 6 additions & 4 deletions manager/manifests/fluentd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -83,20 +83,22 @@ data:
</filter>

<filter **>
@type record_transformer
enable_ruby
@type record_modifier
<record>
default_group_name "#{ENV['LOG_GROUP_NAME']}"
group_name ${record.dig("kubernetes", "labels", "logGroupName") || record["default_group_name"]}
stream_name ${record.dig("kubernetes", "pod_name")}_${record.dig("kubernetes", "container_name")}
log ${record.dig("log").rstrip}
</record>
remove_keys kubernetes,docker,stream
remove_keys kubernetes,docker,stream,default_group_name
</filter>
<match **>
@type cloudwatch_logs
region "#{ENV['AWS_REGION']}"
log_group_name "#{ENV['LOG_GROUP_NAME']}"
log_group_name_key group_name
log_stream_name_key stream_name
remove_log_stream_name_key true
remove_log_group_name_key true
auto_create_stream true
<buffer>
flush_interval 2
Expand Down
5 changes: 5 additions & 0 deletions pkg/operator/api/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,11 @@ func (ctx *Context) VisibleResourceByNameAndType(name string, resourceTypeStr st
return nil, resource.ErrorInvalidType(resourceTypeStr)
}

func (ctx *Context) LogGroupName(apiName string) string {
name := ctx.CortexConfig.LogGroup + "." + ctx.App.Name + "." + apiName
return name
}

func (ctx *Context) Validate() error {
return nil
}
2 changes: 2 additions & 0 deletions pkg/operator/workloads/api_workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@ func tfAPISpec(
"resourceID": ctx.APIs[api.Name].ID,
"workloadID": workloadID,
"userFacing": "true",
"logGroupName": ctx.LogGroupName(api.Name),
},
Annotations: map[string]string{
"traffic.sidecar.istio.io/excludeOutboundIPRanges": "0.0.0.0/0",
Expand Down Expand Up @@ -482,6 +483,7 @@ func onnxAPISpec(
"resourceID": ctx.APIs[api.Name].ID,
"workloadID": workloadID,
"userFacing": "true",
"logGroupName": ctx.LogGroupName(api.Name),
},
Annotations: map[string]string{
"traffic.sidecar.istio.io/excludeOutboundIPRanges": "0.0.0.0/0",
Expand Down
190 changes: 139 additions & 51 deletions pkg/operator/workloads/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
"github.com/gorilla/websocket"
"gopkg.in/karalabe/cookiejar.v1/collections/deque"

awslib "github.com/cortexlabs/cortex/pkg/lib/aws"
"github.com/cortexlabs/cortex/pkg/lib/errors"
"github.com/cortexlabs/cortex/pkg/lib/sets/strset"
s "github.com/cortexlabs/cortex/pkg/lib/strings"
"github.com/cortexlabs/cortex/pkg/operator/api/context"
Expand All @@ -38,9 +40,41 @@ const (
socketMaxMessageSize = 8192

maxLogLinesPerRequest = 500
pollPeriod = 250 // milliseconds
pollPeriod = 250 * time.Millisecond
streamRefreshPeriod = 2 * time.Second
)

type eventCache struct {
size int
seen strset.Set
eventQueue *deque.Deque
}

func newEventCache(cacheSize int) eventCache {
return eventCache{
size: cacheSize,
seen: strset.New(),
eventQueue: deque.New(),
}
}

func (c *eventCache) Has(eventID string) bool {
return c.seen.Has(eventID)
}

func (c *eventCache) PopLeft() {
eventID := c.eventQueue.PopLeft().(string)
c.seen.Remove(eventID)
}

func (c *eventCache) Add(eventID string) {
if c.eventQueue.Size() == c.size {
c.PopLeft()
}
c.seen.Add(eventID)
c.eventQueue.PushRight(eventID)
}

func ReadLogs(appName string, podLabels map[string]string, socket *websocket.Conn) {
podCheckCancel := make(chan struct{})
defer close(podCheckCancel)
Expand All @@ -67,14 +101,29 @@ func StreamFromCloudWatch(podCheckCancel chan struct{}, appName string, podLabel
timer := time.NewTimer(0)
defer timer.Stop()

lastTimestamp := int64(0)
previousEvents := strset.New()
lastLogTime := time.Now()
lastLogStreamUpdateTime := time.Now().Add(-1 * streamRefreshPeriod)

logStreamNames := []string{}
logStreamSet := strset.New()
var currentContextID string
var prefix string
var ctx *context.Context
var err error

var ctx = CurrentContext(appName)
eventCache := newEventCache(10000)

if ctx == nil {
writeAndCloseSocket(socket, "\ndeployment "+appName+" not found")
return
}

logGroupName, err := getLogGroupName(ctx, podLabels)
if err != nil {
writeAndCloseSocket(socket, err.Error()) // unexpected
return
}

for {
select {
case <-podCheckCancel:
Expand All @@ -83,8 +132,7 @@ func StreamFromCloudWatch(podCheckCancel chan struct{}, appName string, podLabel
ctx = CurrentContext(appName)

if ctx == nil {
writeString(socket, "\ndeployment "+appName+" not found")
closeSocket(socket)
writeAndCloseSocket(socket, "\ndeployment "+appName+" not found")
continue
}

Expand All @@ -93,105 +141,140 @@ func StreamFromCloudWatch(podCheckCancel chan struct{}, appName string, podLabel
if podLabels["workloadType"] == resource.APIType.String() {
apiName := podLabels["apiName"]
if _, ok := ctx.APIs[apiName]; !ok {
writeString(socket, "\napi "+apiName+" was not found in latest deployment")
closeSocket(socket)
writeAndCloseSocket(socket, "\napi "+apiName+" was not found in latest deployment")
continue
}
writeString(socket, "\na new deployment was detected, streaming logs from the latest deployment")
} else {
writeString(socket, "\nlogging non-api workloads is not supported") // unexpected
closeSocket(socket)
writeAndCloseSocket(socket, "\nlogging non-api workloads is not supported") // unexpected
continue
}
} else {
lastTimestamp = ctx.CreatedEpoch * 1000
}

if podLabels["workloadType"] == resource.APIType.String() {
podLabels["workloadID"] = ctx.APIs[podLabels["apiName"]].WorkloadID
lastLogTime, _ = getPodStartTime(podLabels)
}

currentContextID = ctx.ID

writeString(socket, "\nretrieving logs...")
prefix = ""
}

if len(prefix) == 0 {
prefix, err = getPrefix(podLabels)
if lastLogStreamUpdateTime.Add(streamRefreshPeriod).Before(time.Now()) {
newLogStreamSet, err := getLogStreams(logGroupName)
if err != nil {
writeString(socket, err.Error())
closeSocket(socket)
writeAndCloseSocket(socket, "error encountered while searching for log streams: "+err.Error())
continue
}

if !logStreamSet.IsEqual(newLogStreamSet) {
lastLogTime = lastLogTime.Add(-streamRefreshPeriod)
logStreamNames = newLogStreamSet.Slice()
}

lastLogStreamUpdateTime = time.Now()
}

if len(prefix) == 0 {
if len(logStreamNames) == 0 {
timer.Reset(pollPeriod)
continue
}

endTime := time.Now().Unix() * 1000
startTime := lastTimestamp - pollPeriod
endTime := TimeToMillis(time.Now())

logEventsOutput, err := config.AWS.CloudWatchLogsClient.FilterLogEvents(&cloudwatchlogs.FilterLogEventsInput{
LogGroupName: aws.String(config.Cortex.LogGroup),
LogStreamNamePrefix: aws.String(prefix),
StartTime: aws.Int64(startTime),
EndTime: aws.Int64(endTime), // requires milliseconds
Limit: aws.Int64(int64(maxLogLinesPerRequest)),
LogGroupName: aws.String(logGroupName),
LogStreamNames: aws.StringSlice(logStreamNames),
StartTime: aws.Int64(TimeToMillis(lastLogTime.Add(-pollPeriod))),
EndTime: aws.Int64(endTime), // requires milliseconds
Limit: aws.Int64(int64(maxLogLinesPerRequest)),
})

if err != nil {
if !awslib.CheckErrCode(err, "ResourceNotFoundException") {
writeString(socket, "error encountered while fetching logs from cloudwatch: "+err.Error())
closeSocket(socket)
if !awslib.CheckErrCode(err, cloudwatchlogs.ErrCodeResourceNotFoundException) {
writeAndCloseSocket(socket, "error encountered while fetching logs from cloudwatch: "+err.Error())
continue
}
}

newEvents := strset.New()
lastLogTimestampMillis := TimeToMillis(lastLogTime)
for _, logEvent := range logEventsOutput.Events {
var log FluentdLog
json.Unmarshal([]byte(*logEvent.Message), &log)

if !previousEvents.Has(*logEvent.EventId) {
if !eventCache.Has(*logEvent.EventId) {
socket.WriteMessage(websocket.TextMessage, []byte(log.Log))
if *logEvent.Timestamp > lastTimestamp {
lastTimestamp = *logEvent.Timestamp
if *logEvent.Timestamp > lastLogTimestampMillis {
lastLogTimestampMillis = *logEvent.Timestamp
}
eventCache.Add(*logEvent.EventId)
}
newEvents.Add(*logEvent.EventId)
}

lastLogTime = MillisToTime(lastLogTimestampMillis)
if len(logEventsOutput.Events) == maxLogLinesPerRequest {
socket.WriteMessage(websocket.TextMessage, []byte("---- Showing at most "+s.Int(maxLogLinesPerRequest)+" lines. Visit AWS cloudwatch logs console and search for \""+prefix+"\" in log group \""+config.Cortex.LogGroup+"\" for complete logs ----"))
lastTimestamp = endTime
writeString(socket, "---- Showing at most "+s.Int(maxLogLinesPerRequest)+" lines. Visit AWS cloudwatch logs console and search for \""+prefix+"\" in log group \""+config.Cortex.LogGroup+"\" for complete logs ----")
lastLogTime = MillisToTime(endTime)
}

previousEvents = newEvents
timer.Reset(pollPeriod * time.Millisecond)
timer.Reset(pollPeriod)
}
}
}

func MillisToTime(epochMillis int64) time.Time {
seconds := epochMillis / 1000
millis := epochMillis % 1000
return time.Unix(seconds, millis*int64(time.Millisecond))
}

func TimeToMillis(t time.Time) int64 {
return t.UnixNano() / int64(time.Millisecond)
}

func getLogStreams(logGroupName string) (strset.Set, error) {
describeLogStreamsOnput, err := config.AWS.CloudWatchLogsClient.DescribeLogStreams(&cloudwatchlogs.DescribeLogStreamsInput{
Descending: aws.Bool(true),
LogGroupName: aws.String(logGroupName),
OrderBy: aws.String(cloudwatchlogs.OrderByLastEventTime),
Limit: aws.Int64(50),
})
if err != nil {
if !awslib.CheckErrCode(err, cloudwatchlogs.ErrCodeResourceNotFoundException) {
return nil, err
}
return nil, nil
}

streams := strset.New()

for _, stream := range describeLogStreamsOnput.LogStreams {
streams.Add(*stream.LogStreamName)
}
return streams, nil
}

func getPrefix(searchLabels map[string]string) (string, error) {
func getPodStartTime(searchLabels map[string]string) (time.Time, error) {
pods, err := config.Kubernetes.ListPodsByLabels(searchLabels)
if err != nil {
return "", err
return time.Now(), err
}

if len(pods) == 0 {
return "", nil
return time.Now(), nil
}

podLabels := pods[0].GetLabels()
if apiName, ok := podLabels["apiName"]; ok {
if podTemplateHash, ok := podLabels["pod-template-hash"]; ok {
return internalAPIName(apiName, podLabels["appName"]) + "-" + podTemplateHash, nil
startTime := pods[0].CreationTimestamp.Time
for _, pod := range pods[1:] {
if pod.CreationTimestamp.Time.Before(startTime) {
startTime = pod.CreationTimestamp.Time
}
return "", nil // unexpected, pod template hash not set yet
}
return pods[0].Name, nil // unexpected, logging non api resources

return startTime, nil
}

func getLogGroupName(ctx *context.Context, searchLabels map[string]string) (string, error) {
if searchLabels["workloadType"] == resource.APIType.String() {
return ctx.LogGroupName(searchLabels["apiName"]), nil
}
return "nil", errors.New("unsupported workload type") // unexpected
}

func writeString(socket *websocket.Conn, message string) {
Expand All @@ -203,3 +286,8 @@ func closeSocket(socket *websocket.Conn) {
socket.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
time.Sleep(socketCloseGracePeriod)
}

func writeAndCloseSocket(socket *websocket.Conn, message string) {
writeString(socket, message)
closeSocket(socket)
}