diff --git a/dev/versions.md b/dev/versions.md index 9c3b37173e..c04bb4af42 100644 --- a/dev/versions.md +++ b/dev/versions.md @@ -151,6 +151,7 @@ Note: overriding horizontal-pod-autoscaler-sync-period on EKS is currently not s 1. Find the latest release on [Dockerhub](https://hub.docker.com/r/fluent/fluentd-kubernetes-daemonset/) 1. Update the base image version in `images/fluentd/Dockerfile` +1. Update record-modifier in `images/fluentd/Dockerfile` to the latest version [here](https://github.com/repeatedly/fluent-plugin-record-modifier/blob/master/VERSION) 1. Update `fluentd.yaml` as necessary (make sure to maintain all Cortex environment variables) ## Statsd diff --git a/docs/cluster/uninstall.md b/docs/cluster/uninstall.md index fd860dc7d7..35ffad1c83 100644 --- a/docs/cluster/uninstall.md +++ b/docs/cluster/uninstall.md @@ -43,5 +43,5 @@ aws s3 ls aws s3 rb --force s3:// # Delete the log group -aws logs delete-log-group --log-group-name cortex --region us-west-2 +aws logs describe-log-groups --log-group-name-prefix= --query logGroups[*].[logGroupName] --output text | xargs -I {} aws logs delete-log-group --log-group-name {} ``` diff --git a/go.mod b/go.mod index ac44a898bd..6dfc886f0d 100644 --- a/go.mod +++ b/go.mod @@ -18,6 +18,7 @@ require ( github.com/tcnksm/go-input v0.0.0-20180404061846-548a7d7a8ee8 github.com/ugorji/go/codec v1.1.7 github.com/xlab/treeprint v0.0.0-20181112141820-a009c3971eca + gopkg.in/karalabe/cookiejar.v2 v2.0.0-20150724131613-8dcd6a7f4951 k8s.io/api v0.0.0-20190620084959-7cf5895f2711 k8s.io/apimachinery v0.0.0-20190612205821-1799e75a0719 k8s.io/client-go v0.0.0-20190620085101-78d2af792bab diff --git a/go.sum b/go.sum index cfd19ec971..58996425df 100644 --- a/go.sum +++ b/go.sum @@ -130,6 +130,8 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/inf.v0 v0.9.0 h1:3zYtXIO92bvsdS3ggAdA8Gb4Azj0YU+TVY1uGYNFA8o= gopkg.in/inf.v0 v0.9.0/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= +gopkg.in/karalabe/cookiejar.v2 v2.0.0-20150724131613-8dcd6a7f4951 h1:DMTcQRFbEH62YPRWwOI647s2e5mHda3oBPMHfrLs2bw= +gopkg.in/karalabe/cookiejar.v2 v2.0.0-20150724131613-8dcd6a7f4951/go.mod h1:owOxCRGGeAx1uugABik6K9oeNu1cgxP/R9ItzLDxNWA= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.2.1 h1:mUhvW9EsL+naU5Q3cakzfE91YhliOondGd6ZrsDBHQE= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/images/fluentd/Dockerfile b/images/fluentd/Dockerfile index ca0c658eae..2a75e5b44d 100644 --- a/images/fluentd/Dockerfile +++ b/images/fluentd/Dockerfile @@ -1,3 +1,2 @@ FROM fluent/fluentd-kubernetes-daemonset:v1.7.1-debian-cloudwatch-1.0 -RUN fluent-gem install fluent-plugin-grep -RUN fluent-gem install fluent-plugin-route +RUN fluent-gem install fluent-plugin-record-modifier -v 2.0.1 --no-document diff --git a/manager/manifests/fluentd.yaml b/manager/manifests/fluentd.yaml index f9b16fcac6..6b4844dea0 100644 --- a/manager/manifests/fluentd.yaml +++ b/manager/manifests/fluentd.yaml @@ -83,9 +83,9 @@ data: - @type record_transformer - enable_ruby + @type record_modifier + group_name ${record.dig("kubernetes", "labels", "logGroupName") || ENV['LOG_GROUP_NAME']} stream_name ${record.dig("kubernetes", "pod_name")}_${record.dig("kubernetes", "container_name")} log ${record.dig("log").rstrip} @@ -94,9 +94,10 @@ data: @type cloudwatch_logs region "#{ENV['AWS_REGION']}" - log_group_name "#{ENV['LOG_GROUP_NAME']}" + log_group_name_key group_name log_stream_name_key stream_name remove_log_stream_name_key true + remove_log_group_name_key true auto_create_stream true flush_interval 2 diff --git a/pkg/lib/time/time.go b/pkg/lib/time/time.go index ac2735abff..6de347f0d8 100644 --- a/pkg/lib/time/time.go +++ b/pkg/lib/time/time.go @@ -109,3 +109,13 @@ func LocalHourNow() string { func OlderThanSeconds(t time.Time, secs float64) bool { return time.Since(t).Seconds() > secs } + +func MillisToTime(epochMillis int64) time.Time { + seconds := epochMillis / 1000 + millis := epochMillis % 1000 + return time.Unix(seconds, millis*int64(time.Millisecond)) +} + +func ToMillis(t time.Time) int64 { + return t.UnixNano() / int64(time.Millisecond) +} diff --git a/pkg/operator/api/context/context.go b/pkg/operator/api/context/context.go index f073679f49..c2f48d9cf4 100644 --- a/pkg/operator/api/context/context.go +++ b/pkg/operator/api/context/context.go @@ -209,6 +209,11 @@ func (ctx *Context) VisibleResourceByNameAndType(name string, resourceTypeStr st return nil, resource.ErrorInvalidType(resourceTypeStr) } +func (ctx *Context) LogGroupName(apiName string) string { + name := ctx.CortexConfig.LogGroup + "." + ctx.App.Name + "." + apiName + return name +} + func (ctx *Context) Validate() error { return nil } diff --git a/pkg/operator/workloads/api_workload.go b/pkg/operator/workloads/api_workload.go index 91dfac4973..e229959908 100644 --- a/pkg/operator/workloads/api_workload.go +++ b/pkg/operator/workloads/api_workload.go @@ -307,6 +307,7 @@ func tfAPISpec( "resourceID": ctx.APIs[api.Name].ID, "workloadID": workloadID, "userFacing": "true", + "logGroupName": ctx.LogGroupName(api.Name), }, Annotations: map[string]string{ "traffic.sidecar.istio.io/excludeOutboundIPRanges": "0.0.0.0/0", @@ -482,6 +483,7 @@ func onnxAPISpec( "resourceID": ctx.APIs[api.Name].ID, "workloadID": workloadID, "userFacing": "true", + "logGroupName": ctx.LogGroupName(api.Name), }, Annotations: map[string]string{ "traffic.sidecar.istio.io/excludeOutboundIPRanges": "0.0.0.0/0", diff --git a/pkg/operator/workloads/logs.go b/pkg/operator/workloads/logs.go index 15ccc31ed1..c9f96924a8 100644 --- a/pkg/operator/workloads/logs.go +++ b/pkg/operator/workloads/logs.go @@ -23,10 +23,13 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/cloudwatchlogs" "github.com/gorilla/websocket" + "gopkg.in/karalabe/cookiejar.v2/collections/deque" awslib "github.com/cortexlabs/cortex/pkg/lib/aws" + "github.com/cortexlabs/cortex/pkg/lib/errors" "github.com/cortexlabs/cortex/pkg/lib/sets/strset" s "github.com/cortexlabs/cortex/pkg/lib/strings" + libtime "github.com/cortexlabs/cortex/pkg/lib/time" "github.com/cortexlabs/cortex/pkg/operator/api/context" "github.com/cortexlabs/cortex/pkg/operator/api/resource" "github.com/cortexlabs/cortex/pkg/operator/config" @@ -37,10 +40,40 @@ const ( socketCloseGracePeriod = 10 * time.Second socketMaxMessageSize = 8192 + maxCacheSize = 10000 maxLogLinesPerRequest = 500 - pollPeriod = 250 // milliseconds + maxStreamsPerRequest = 50 + pollPeriod = 250 * time.Millisecond + streamRefreshPeriod = 2 * time.Second ) +type eventCache struct { + size int + seen strset.Set + eventQueue *deque.Deque +} + +func newEventCache(cacheSize int) eventCache { + return eventCache{ + size: cacheSize, + seen: strset.New(), + eventQueue: deque.New(), + } +} + +func (c *eventCache) Has(eventID string) bool { + return c.seen.Has(eventID) +} + +func (c *eventCache) Add(eventID string) { + if c.eventQueue.Size() == c.size { + eventID := c.eventQueue.PopLeft().(string) + c.seen.Remove(eventID) + } + c.seen.Add(eventID) + c.eventQueue.PushRight(eventID) +} + func ReadLogs(appName string, podLabels map[string]string, socket *websocket.Conn) { podCheckCancel := make(chan struct{}) defer close(podCheckCancel) @@ -67,14 +100,29 @@ func StreamFromCloudWatch(podCheckCancel chan struct{}, appName string, podLabel timer := time.NewTimer(0) defer timer.Stop() - lastTimestamp := int64(0) - previousEvents := strset.New() + lastLogTime := time.Now() + lastLogStreamUpdateTime := time.Now().Add(-1 * streamRefreshPeriod) + + logStreamNames := strset.New() var currentContextID string var prefix string - var ctx *context.Context var err error + var ctx = CurrentContext(appName) + eventCache := newEventCache(maxCacheSize) + + if ctx == nil { + writeAndCloseSocket(socket, "\ndeployment "+appName+" not found") + return + } + + logGroupName, err := getLogGroupName(ctx, podLabels) + if err != nil { + writeAndCloseSocket(socket, err.Error()) // unexpected + return + } + for { select { case <-podCheckCancel: @@ -83,8 +131,7 @@ func StreamFromCloudWatch(podCheckCancel chan struct{}, appName string, podLabel ctx = CurrentContext(appName) if ctx == nil { - writeString(socket, "\ndeployment "+appName+" not found") - closeSocket(socket) + writeAndCloseSocket(socket, "\ndeployment "+appName+" not found") continue } @@ -93,105 +140,129 @@ func StreamFromCloudWatch(podCheckCancel chan struct{}, appName string, podLabel if podLabels["workloadType"] == resource.APIType.String() { apiName := podLabels["apiName"] if _, ok := ctx.APIs[apiName]; !ok { - writeString(socket, "\napi "+apiName+" was not found in latest deployment") - closeSocket(socket) + writeAndCloseSocket(socket, "\napi "+apiName+" was not found in latest deployment") continue } writeString(socket, "\na new deployment was detected, streaming logs from the latest deployment") } else { - writeString(socket, "\nlogging non-api workloads is not supported") // unexpected - closeSocket(socket) + writeAndCloseSocket(socket, "\nlogging non-api workloads is not supported") // unexpected continue } } else { - lastTimestamp = ctx.CreatedEpoch * 1000 - } - - if podLabels["workloadType"] == resource.APIType.String() { - podLabels["workloadID"] = ctx.APIs[podLabels["apiName"]].WorkloadID + lastLogTime, _ = getPodStartTime(podLabels) } currentContextID = ctx.ID - writeString(socket, "\nretrieving logs...") - prefix = "" } - if len(prefix) == 0 { - prefix, err = getPrefix(podLabels) + if lastLogStreamUpdateTime.Add(streamRefreshPeriod).Before(time.Now()) { + newLogStreamNames, err := getLogStreams(logGroupName) if err != nil { - writeString(socket, err.Error()) - closeSocket(socket) + writeAndCloseSocket(socket, "error encountered while searching for log streams: "+err.Error()) continue } + + if !logStreamNames.IsEqual(newLogStreamNames) { + lastLogTime = lastLogTime.Add(-streamRefreshPeriod) + logStreamNames = newLogStreamNames + } + lastLogStreamUpdateTime = time.Now() } - if len(prefix) == 0 { + if len(logStreamNames) == 0 { timer.Reset(pollPeriod) continue } - endTime := time.Now().Unix() * 1000 - startTime := lastTimestamp - pollPeriod + endTime := libtime.ToMillis(time.Now()) + logEventsOutput, err := config.AWS.CloudWatchLogsClient.FilterLogEvents(&cloudwatchlogs.FilterLogEventsInput{ - LogGroupName: aws.String(config.Cortex.LogGroup), - LogStreamNamePrefix: aws.String(prefix), - StartTime: aws.Int64(startTime), - EndTime: aws.Int64(endTime), // requires milliseconds - Limit: aws.Int64(int64(maxLogLinesPerRequest)), + LogGroupName: aws.String(logGroupName), + LogStreamNames: aws.StringSlice(logStreamNames.Slice()), + StartTime: aws.Int64(libtime.ToMillis(lastLogTime.Add(-pollPeriod))), + EndTime: aws.Int64(endTime), + Limit: aws.Int64(int64(maxLogLinesPerRequest)), }) if err != nil { - if !awslib.CheckErrCode(err, "ResourceNotFoundException") { - writeString(socket, "error encountered while fetching logs from cloudwatch: "+err.Error()) - closeSocket(socket) + if !awslib.CheckErrCode(err, cloudwatchlogs.ErrCodeResourceNotFoundException) { + writeAndCloseSocket(socket, "error encountered while fetching logs from cloudwatch: "+err.Error()) continue } } - newEvents := strset.New() + lastLogTimestampMillis := libtime.ToMillis(lastLogTime) for _, logEvent := range logEventsOutput.Events { var log FluentdLog json.Unmarshal([]byte(*logEvent.Message), &log) - - if !previousEvents.Has(*logEvent.EventId) { + if !eventCache.Has(*logEvent.EventId) { socket.WriteMessage(websocket.TextMessage, []byte(log.Log)) - if *logEvent.Timestamp > lastTimestamp { - lastTimestamp = *logEvent.Timestamp + if *logEvent.Timestamp > lastLogTimestampMillis { + lastLogTimestampMillis = *logEvent.Timestamp } + eventCache.Add(*logEvent.EventId) } - newEvents.Add(*logEvent.EventId) } + lastLogTime = libtime.MillisToTime(lastLogTimestampMillis) if len(logEventsOutput.Events) == maxLogLinesPerRequest { - socket.WriteMessage(websocket.TextMessage, []byte("---- Showing at most "+s.Int(maxLogLinesPerRequest)+" lines. Visit AWS cloudwatch logs console and search for \""+prefix+"\" in log group \""+config.Cortex.LogGroup+"\" for complete logs ----")) - lastTimestamp = endTime + writeString(socket, "---- Showing at most "+s.Int(maxLogLinesPerRequest)+" lines. Visit AWS cloudwatch logs console and search for \""+prefix+"\" in log group \""+config.Cortex.LogGroup+"\" for complete logs ----") + lastLogTime = libtime.MillisToTime(endTime) } - previousEvents = newEvents - timer.Reset(pollPeriod * time.Millisecond) + timer.Reset(pollPeriod) } } } -func getPrefix(searchLabels map[string]string) (string, error) { +func getLogStreams(logGroupName string) (strset.Set, error) { + describeLogStreamsOutput, err := config.AWS.CloudWatchLogsClient.DescribeLogStreams(&cloudwatchlogs.DescribeLogStreamsInput{ + OrderBy: aws.String(cloudwatchlogs.OrderByLastEventTime), + Descending: aws.Bool(true), + LogGroupName: aws.String(logGroupName), + Limit: aws.Int64(maxStreamsPerRequest), + }) + if err != nil { + if !awslib.CheckErrCode(err, cloudwatchlogs.ErrCodeResourceNotFoundException) { + return nil, err + } + return nil, nil + } + + streams := strset.New() + + for _, stream := range describeLogStreamsOutput.LogStreams { + streams.Add(*stream.LogStreamName) + } + return streams, nil +} + +func getPodStartTime(searchLabels map[string]string) (time.Time, error) { pods, err := config.Kubernetes.ListPodsByLabels(searchLabels) if err != nil { - return "", err + return time.Time{}, err } if len(pods) == 0 { - return "", nil + return time.Now(), nil } - podLabels := pods[0].GetLabels() - if apiName, ok := podLabels["apiName"]; ok { - if podTemplateHash, ok := podLabels["pod-template-hash"]; ok { - return internalAPIName(apiName, podLabels["appName"]) + "-" + podTemplateHash, nil + startTime := pods[0].CreationTimestamp.Time + for _, pod := range pods[1:] { + if pod.CreationTimestamp.Time.Before(startTime) { + startTime = pod.CreationTimestamp.Time } - return "", nil // unexpected, pod template hash not set yet } - return pods[0].Name, nil // unexpected, logging non api resources + + return startTime, nil +} + +func getLogGroupName(ctx *context.Context, searchLabels map[string]string) (string, error) { + if searchLabels["workloadType"] == resource.APIType.String() { + return ctx.LogGroupName(searchLabels["apiName"]), nil + } + return "nil", errors.New("unsupported workload type") // unexpected } func writeString(socket *websocket.Conn, message string) { @@ -203,3 +274,8 @@ func closeSocket(socket *websocket.Conn) { socket.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) time.Sleep(socketCloseGracePeriod) } + +func writeAndCloseSocket(socket *websocket.Conn, message string) { + writeString(socket, message) + closeSocket(socket) +}