Skip to content

Create loggroup per api stream from log group #466

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Sep 17, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dev/versions.md
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ Note: overriding horizontal-pod-autoscaler-sync-period on EKS is currently not s

1. Find the latest release on [Dockerhub](https://hub.docker.com/r/fluent/fluentd-kubernetes-daemonset/)
1. Update the base image version in `images/fluentd/Dockerfile`
1. Update record-modifier in `images/fluentd/Dockerfile` to the latest version [here](https://github.com/repeatedly/fluent-plugin-record-modifier/blob/master/VERSION)
1. Update `fluentd.yaml` as necessary (make sure to maintain all Cortex environment variables)

## Statsd
Expand Down
2 changes: 1 addition & 1 deletion docs/cluster/uninstall.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,5 +43,5 @@ aws s3 ls
aws s3 rb --force s3://<bucket-name>

# Delete the log group
aws logs delete-log-group --log-group-name cortex --region us-west-2
aws logs describe-log-groups --log-group-name-prefix=<log_group_name> --query logGroups[*].[logGroupName] --output text | xargs -I {} aws logs delete-log-group --log-group-name {}
```
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ require (
github.com/tcnksm/go-input v0.0.0-20180404061846-548a7d7a8ee8
github.com/ugorji/go/codec v1.1.7
github.com/xlab/treeprint v0.0.0-20181112141820-a009c3971eca
gopkg.in/karalabe/cookiejar.v2 v2.0.0-20150724131613-8dcd6a7f4951
k8s.io/api v0.0.0-20190620084959-7cf5895f2711
k8s.io/apimachinery v0.0.0-20190612205821-1799e75a0719
k8s.io/client-go v0.0.0-20190620085101-78d2af792bab
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/inf.v0 v0.9.0 h1:3zYtXIO92bvsdS3ggAdA8Gb4Azj0YU+TVY1uGYNFA8o=
gopkg.in/inf.v0 v0.9.0/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
gopkg.in/karalabe/cookiejar.v2 v2.0.0-20150724131613-8dcd6a7f4951 h1:DMTcQRFbEH62YPRWwOI647s2e5mHda3oBPMHfrLs2bw=
gopkg.in/karalabe/cookiejar.v2 v2.0.0-20150724131613-8dcd6a7f4951/go.mod h1:owOxCRGGeAx1uugABik6K9oeNu1cgxP/R9ItzLDxNWA=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v2 v2.2.1 h1:mUhvW9EsL+naU5Q3cakzfE91YhliOondGd6ZrsDBHQE=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
Expand Down
3 changes: 1 addition & 2 deletions images/fluentd/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
FROM fluent/fluentd-kubernetes-daemonset:v1.7.1-debian-cloudwatch-1.0
RUN fluent-gem install fluent-plugin-grep
RUN fluent-gem install fluent-plugin-route
RUN fluent-gem install fluent-plugin-record-modifier -v 2.0.1 --no-document
7 changes: 4 additions & 3 deletions manager/manifests/fluentd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,9 @@ data:
</filter>

<filter **>
@type record_transformer
enable_ruby
@type record_modifier
<record>
group_name ${record.dig("kubernetes", "labels", "logGroupName") || ENV['LOG_GROUP_NAME']}
stream_name ${record.dig("kubernetes", "pod_name")}_${record.dig("kubernetes", "container_name")}
log ${record.dig("log").rstrip}
</record>
Expand All @@ -94,9 +94,10 @@ data:
<match **>
@type cloudwatch_logs
region "#{ENV['AWS_REGION']}"
log_group_name "#{ENV['LOG_GROUP_NAME']}"
log_group_name_key group_name
log_stream_name_key stream_name
remove_log_stream_name_key true
remove_log_group_name_key true
auto_create_stream true
<buffer>
flush_interval 2
Expand Down
10 changes: 10 additions & 0 deletions pkg/lib/time/time.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,13 @@ func LocalHourNow() string {
func OlderThanSeconds(t time.Time, secs float64) bool {
return time.Since(t).Seconds() > secs
}

func MillisToTime(epochMillis int64) time.Time {
seconds := epochMillis / 1000
millis := epochMillis % 1000
return time.Unix(seconds, millis*int64(time.Millisecond))
}

func ToMillis(t time.Time) int64 {
return t.UnixNano() / int64(time.Millisecond)
}
5 changes: 5 additions & 0 deletions pkg/operator/api/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,11 @@ func (ctx *Context) VisibleResourceByNameAndType(name string, resourceTypeStr st
return nil, resource.ErrorInvalidType(resourceTypeStr)
}

func (ctx *Context) LogGroupName(apiName string) string {
name := ctx.CortexConfig.LogGroup + "." + ctx.App.Name + "." + apiName
return name
}

func (ctx *Context) Validate() error {
return nil
}
2 changes: 2 additions & 0 deletions pkg/operator/workloads/api_workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@ func tfAPISpec(
"resourceID": ctx.APIs[api.Name].ID,
"workloadID": workloadID,
"userFacing": "true",
"logGroupName": ctx.LogGroupName(api.Name),
},
Annotations: map[string]string{
"traffic.sidecar.istio.io/excludeOutboundIPRanges": "0.0.0.0/0",
Expand Down Expand Up @@ -482,6 +483,7 @@ func onnxAPISpec(
"resourceID": ctx.APIs[api.Name].ID,
"workloadID": workloadID,
"userFacing": "true",
"logGroupName": ctx.LogGroupName(api.Name),
},
Annotations: map[string]string{
"traffic.sidecar.istio.io/excludeOutboundIPRanges": "0.0.0.0/0",
Expand Down
178 changes: 127 additions & 51 deletions pkg/operator/workloads/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,13 @@ import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
"github.com/gorilla/websocket"
"gopkg.in/karalabe/cookiejar.v2/collections/deque"

awslib "github.com/cortexlabs/cortex/pkg/lib/aws"
"github.com/cortexlabs/cortex/pkg/lib/errors"
"github.com/cortexlabs/cortex/pkg/lib/sets/strset"
s "github.com/cortexlabs/cortex/pkg/lib/strings"
libtime "github.com/cortexlabs/cortex/pkg/lib/time"
"github.com/cortexlabs/cortex/pkg/operator/api/context"
"github.com/cortexlabs/cortex/pkg/operator/api/resource"
"github.com/cortexlabs/cortex/pkg/operator/config"
Expand All @@ -37,10 +40,40 @@ const (
socketCloseGracePeriod = 10 * time.Second
socketMaxMessageSize = 8192

maxCacheSize = 10000
maxLogLinesPerRequest = 500
pollPeriod = 250 // milliseconds
maxStreamsPerRequest = 50
pollPeriod = 250 * time.Millisecond
streamRefreshPeriod = 2 * time.Second
)

type eventCache struct {
size int
seen strset.Set
eventQueue *deque.Deque
}

func newEventCache(cacheSize int) eventCache {
return eventCache{
size: cacheSize,
seen: strset.New(),
eventQueue: deque.New(),
}
}

func (c *eventCache) Has(eventID string) bool {
return c.seen.Has(eventID)
}

func (c *eventCache) Add(eventID string) {
if c.eventQueue.Size() == c.size {
eventID := c.eventQueue.PopLeft().(string)
c.seen.Remove(eventID)
}
c.seen.Add(eventID)
c.eventQueue.PushRight(eventID)
}

func ReadLogs(appName string, podLabels map[string]string, socket *websocket.Conn) {
podCheckCancel := make(chan struct{})
defer close(podCheckCancel)
Expand All @@ -67,14 +100,29 @@ func StreamFromCloudWatch(podCheckCancel chan struct{}, appName string, podLabel
timer := time.NewTimer(0)
defer timer.Stop()

lastTimestamp := int64(0)
previousEvents := strset.New()
lastLogTime := time.Now()
lastLogStreamUpdateTime := time.Now().Add(-1 * streamRefreshPeriod)

logStreamNames := strset.New()

var currentContextID string
var prefix string
var ctx *context.Context
var err error

var ctx = CurrentContext(appName)
eventCache := newEventCache(maxCacheSize)

if ctx == nil {
writeAndCloseSocket(socket, "\ndeployment "+appName+" not found")
return
}

logGroupName, err := getLogGroupName(ctx, podLabels)
if err != nil {
writeAndCloseSocket(socket, err.Error()) // unexpected
return
}

for {
select {
case <-podCheckCancel:
Expand All @@ -83,8 +131,7 @@ func StreamFromCloudWatch(podCheckCancel chan struct{}, appName string, podLabel
ctx = CurrentContext(appName)

if ctx == nil {
writeString(socket, "\ndeployment "+appName+" not found")
closeSocket(socket)
writeAndCloseSocket(socket, "\ndeployment "+appName+" not found")
continue
}

Expand All @@ -93,105 +140,129 @@ func StreamFromCloudWatch(podCheckCancel chan struct{}, appName string, podLabel
if podLabels["workloadType"] == resource.APIType.String() {
apiName := podLabels["apiName"]
if _, ok := ctx.APIs[apiName]; !ok {
writeString(socket, "\napi "+apiName+" was not found in latest deployment")
closeSocket(socket)
writeAndCloseSocket(socket, "\napi "+apiName+" was not found in latest deployment")
continue
}
writeString(socket, "\na new deployment was detected, streaming logs from the latest deployment")
} else {
writeString(socket, "\nlogging non-api workloads is not supported") // unexpected
closeSocket(socket)
writeAndCloseSocket(socket, "\nlogging non-api workloads is not supported") // unexpected
continue
}
} else {
lastTimestamp = ctx.CreatedEpoch * 1000
}

if podLabels["workloadType"] == resource.APIType.String() {
podLabels["workloadID"] = ctx.APIs[podLabels["apiName"]].WorkloadID
lastLogTime, _ = getPodStartTime(podLabels)
}

currentContextID = ctx.ID

writeString(socket, "\nretrieving logs...")
prefix = ""
}

if len(prefix) == 0 {
prefix, err = getPrefix(podLabels)
if lastLogStreamUpdateTime.Add(streamRefreshPeriod).Before(time.Now()) {
newLogStreamNames, err := getLogStreams(logGroupName)
if err != nil {
writeString(socket, err.Error())
closeSocket(socket)
writeAndCloseSocket(socket, "error encountered while searching for log streams: "+err.Error())
continue
}

if !logStreamNames.IsEqual(newLogStreamNames) {
lastLogTime = lastLogTime.Add(-streamRefreshPeriod)
logStreamNames = newLogStreamNames
}
lastLogStreamUpdateTime = time.Now()
}

if len(prefix) == 0 {
if len(logStreamNames) == 0 {
timer.Reset(pollPeriod)
continue
}

endTime := time.Now().Unix() * 1000
startTime := lastTimestamp - pollPeriod
endTime := libtime.ToMillis(time.Now())

logEventsOutput, err := config.AWS.CloudWatchLogsClient.FilterLogEvents(&cloudwatchlogs.FilterLogEventsInput{
LogGroupName: aws.String(config.Cortex.LogGroup),
LogStreamNamePrefix: aws.String(prefix),
StartTime: aws.Int64(startTime),
EndTime: aws.Int64(endTime), // requires milliseconds
Limit: aws.Int64(int64(maxLogLinesPerRequest)),
LogGroupName: aws.String(logGroupName),
LogStreamNames: aws.StringSlice(logStreamNames.Slice()),
StartTime: aws.Int64(libtime.ToMillis(lastLogTime.Add(-pollPeriod))),
EndTime: aws.Int64(endTime),
Limit: aws.Int64(int64(maxLogLinesPerRequest)),
})

if err != nil {
if !awslib.CheckErrCode(err, "ResourceNotFoundException") {
writeString(socket, "error encountered while fetching logs from cloudwatch: "+err.Error())
closeSocket(socket)
if !awslib.CheckErrCode(err, cloudwatchlogs.ErrCodeResourceNotFoundException) {
writeAndCloseSocket(socket, "error encountered while fetching logs from cloudwatch: "+err.Error())
continue
}
}

newEvents := strset.New()
lastLogTimestampMillis := libtime.ToMillis(lastLogTime)
for _, logEvent := range logEventsOutput.Events {
var log FluentdLog
json.Unmarshal([]byte(*logEvent.Message), &log)

if !previousEvents.Has(*logEvent.EventId) {
if !eventCache.Has(*logEvent.EventId) {
socket.WriteMessage(websocket.TextMessage, []byte(log.Log))
if *logEvent.Timestamp > lastTimestamp {
lastTimestamp = *logEvent.Timestamp
if *logEvent.Timestamp > lastLogTimestampMillis {
lastLogTimestampMillis = *logEvent.Timestamp
}
eventCache.Add(*logEvent.EventId)
}
newEvents.Add(*logEvent.EventId)
}

lastLogTime = libtime.MillisToTime(lastLogTimestampMillis)
if len(logEventsOutput.Events) == maxLogLinesPerRequest {
socket.WriteMessage(websocket.TextMessage, []byte("---- Showing at most "+s.Int(maxLogLinesPerRequest)+" lines. Visit AWS cloudwatch logs console and search for \""+prefix+"\" in log group \""+config.Cortex.LogGroup+"\" for complete logs ----"))
lastTimestamp = endTime
writeString(socket, "---- Showing at most "+s.Int(maxLogLinesPerRequest)+" lines. Visit AWS cloudwatch logs console and search for \""+prefix+"\" in log group \""+config.Cortex.LogGroup+"\" for complete logs ----")
lastLogTime = libtime.MillisToTime(endTime)
}

previousEvents = newEvents
timer.Reset(pollPeriod * time.Millisecond)
timer.Reset(pollPeriod)
}
}
}

func getPrefix(searchLabels map[string]string) (string, error) {
func getLogStreams(logGroupName string) (strset.Set, error) {
describeLogStreamsOutput, err := config.AWS.CloudWatchLogsClient.DescribeLogStreams(&cloudwatchlogs.DescribeLogStreamsInput{
OrderBy: aws.String(cloudwatchlogs.OrderByLastEventTime),
Descending: aws.Bool(true),
LogGroupName: aws.String(logGroupName),
Limit: aws.Int64(maxStreamsPerRequest),
})
if err != nil {
if !awslib.CheckErrCode(err, cloudwatchlogs.ErrCodeResourceNotFoundException) {
return nil, err
}
return nil, nil
}

streams := strset.New()

for _, stream := range describeLogStreamsOutput.LogStreams {
streams.Add(*stream.LogStreamName)
}
return streams, nil
}

func getPodStartTime(searchLabels map[string]string) (time.Time, error) {
pods, err := config.Kubernetes.ListPodsByLabels(searchLabels)
if err != nil {
return "", err
return time.Time{}, err
}

if len(pods) == 0 {
return "", nil
return time.Now(), nil
}

podLabels := pods[0].GetLabels()
if apiName, ok := podLabels["apiName"]; ok {
if podTemplateHash, ok := podLabels["pod-template-hash"]; ok {
return internalAPIName(apiName, podLabels["appName"]) + "-" + podTemplateHash, nil
startTime := pods[0].CreationTimestamp.Time
for _, pod := range pods[1:] {
if pod.CreationTimestamp.Time.Before(startTime) {
startTime = pod.CreationTimestamp.Time
}
return "", nil // unexpected, pod template hash not set yet
}
return pods[0].Name, nil // unexpected, logging non api resources

return startTime, nil
}

func getLogGroupName(ctx *context.Context, searchLabels map[string]string) (string, error) {
if searchLabels["workloadType"] == resource.APIType.String() {
return ctx.LogGroupName(searchLabels["apiName"]), nil
}
return "nil", errors.New("unsupported workload type") // unexpected
}

func writeString(socket *websocket.Conn, message string) {
Expand All @@ -203,3 +274,8 @@ func closeSocket(socket *websocket.Conn) {
socket.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
time.Sleep(socketCloseGracePeriod)
}

func writeAndCloseSocket(socket *websocket.Conn, message string) {
writeString(socket, message)
closeSocket(socket)
}