@@ -25,10 +25,10 @@ import (
25
25
"github.com/gorilla/websocket"
26
26
27
27
awslib "github.com/cortexlabs/cortex/pkg/lib/aws"
28
- "github.com/cortexlabs/cortex/pkg/lib/k8s"
29
28
"github.com/cortexlabs/cortex/pkg/lib/sets/strset"
30
29
s "github.com/cortexlabs/cortex/pkg/lib/strings"
31
30
"github.com/cortexlabs/cortex/pkg/operator/api/context"
31
+ "github.com/cortexlabs/cortex/pkg/operator/api/resource"
32
32
"github.com/cortexlabs/cortex/pkg/operator/config"
33
33
)
34
34
@@ -38,7 +38,7 @@ const (
38
38
socketMaxMessageSize = 8192
39
39
40
40
maxLogLinesPerRequest = 500
41
- pollPeriod = 250 * time . Millisecond
41
+ pollPeriod = 250 // milliseconds
42
42
)
43
43
44
44
func ReadLogs (appName string , podLabels map [string ]string , socket * websocket.Conn ) {
@@ -69,7 +69,6 @@ func StreamFromCloudWatch(podCheckCancel chan struct{}, appName string, podLabel
69
69
70
70
lastTimestamp := int64 (0 )
71
71
previousEvents := strset .New ()
72
- wrotePending := false
73
72
74
73
var currentContextID string
75
74
var prefix string
@@ -91,45 +90,30 @@ func StreamFromCloudWatch(podCheckCancel chan struct{}, appName string, podLabel
91
90
92
91
if ctx .ID != currentContextID {
93
92
if len (currentContextID ) != 0 {
94
- if apiName , ok := podLabels ["apiName" ]; ok {
93
+ if podLabels ["workloadType" ] == resource .APIType .String () {
94
+ apiName := podLabels ["apiName" ]
95
95
if _ , ok := ctx .APIs [apiName ]; ! ok {
96
96
writeString (socket , "\n api " + apiName + " was not found in latest deployment" )
97
97
closeSocket (socket )
98
98
continue
99
99
}
100
- podLabels ["workloadID" ] = ctx .APIs [apiName ].WorkloadID
101
100
writeString (socket , "\n a new deployment was detected, streaming logs from the latest deployment" )
102
101
} else {
103
- writeString (socket , "\n new deployment detected, shutting down log stream " ) // unexpected for now, should only occur when logging non api resources
102
+ writeString (socket , "\n logging non-api workloads is not supported " ) // unexpected
104
103
closeSocket (socket )
105
104
continue
106
105
}
106
+ } else {
107
+ lastTimestamp = ctx .CreatedEpoch * 1000
107
108
}
108
109
109
- currentContextID = ctx .ID
110
-
111
- pods , err := config .Kubernetes .ListPodsByLabels (podLabels )
112
- if err != nil {
113
- writeString (socket , err .Error ())
114
- closeSocket (socket )
115
- continue
110
+ if podLabels ["workloadType" ] == resource .APIType .String () {
111
+ podLabels ["workloadID" ] = ctx .APIs [podLabels ["apiName" ]].WorkloadID
116
112
}
117
113
118
- allPodsPending := true
119
- for _ , pod := range pods {
120
- if k8s .GetPodStatus (& pod ) != k8s .PodStatusPending {
121
- allPodsPending = false
122
- break
123
- }
124
- }
125
-
126
- if allPodsPending {
127
- writeString (socket , "\n pending..." )
128
- wrotePending = true
129
- } else {
130
- wrotePending = false
131
- }
114
+ currentContextID = ctx .ID
132
115
116
+ writeString (socket , "\n retrieving logs..." )
133
117
prefix = ""
134
118
}
135
119
@@ -147,21 +131,18 @@ func StreamFromCloudWatch(podCheckCancel chan struct{}, appName string, podLabel
147
131
continue
148
132
}
149
133
134
+ endTime := time .Now ().Unix () * 1000
135
+ startTime := lastTimestamp - pollPeriod
150
136
logEventsOutput , err := config .AWS .CloudWatchLogsClient .FilterLogEvents (& cloudwatchlogs.FilterLogEventsInput {
151
137
LogGroupName : aws .String (config .Cortex .LogGroup ),
152
138
LogStreamNamePrefix : aws .String (prefix ),
153
- StartTime : aws .Int64 (lastTimestamp ),
154
- EndTime : aws .Int64 (time . Now (). Unix () * 1000 ), // requires milliseconds
139
+ StartTime : aws .Int64 (startTime ),
140
+ EndTime : aws .Int64 (endTime ), // requires milliseconds
155
141
Limit : aws .Int64 (int64 (maxLogLinesPerRequest )),
156
142
})
157
143
158
144
if err != nil {
159
- if awslib .CheckErrCode (err , "ResourceNotFoundException" ) {
160
- if ! wrotePending {
161
- writeString (socket , "pending..." )
162
- wrotePending = true
163
- }
164
- } else {
145
+ if ! awslib .CheckErrCode (err , "ResourceNotFoundException" ) {
165
146
writeString (socket , "error encountered while fetching logs from cloudwatch: " + err .Error ())
166
147
closeSocket (socket )
167
148
continue
@@ -175,7 +156,6 @@ func StreamFromCloudWatch(podCheckCancel chan struct{}, appName string, podLabel
175
156
176
157
if ! previousEvents .Has (* logEvent .EventId ) {
177
158
socket .WriteMessage (websocket .TextMessage , []byte (log .Log ))
178
-
179
159
if * logEvent .Timestamp > lastTimestamp {
180
160
lastTimestamp = * logEvent .Timestamp
181
161
}
@@ -185,10 +165,11 @@ func StreamFromCloudWatch(podCheckCancel chan struct{}, appName string, podLabel
185
165
186
166
if len (logEventsOutput .Events ) == maxLogLinesPerRequest {
187
167
socket .WriteMessage (websocket .TextMessage , []byte ("---- Showing at most " + s .Int (maxLogLinesPerRequest )+ " lines. Visit AWS cloudwatch logs console and search for \" " + prefix + "\" in log group \" " + config .Cortex .LogGroup + "\" for complete logs ----" ))
168
+ lastTimestamp = endTime
188
169
}
189
170
190
171
previousEvents = newEvents
191
- timer .Reset (pollPeriod )
172
+ timer .Reset (pollPeriod * time . Millisecond )
192
173
}
193
174
}
194
175
}
@@ -214,7 +195,6 @@ func getPrefix(searchLabels map[string]string) (string, error) {
214
195
}
215
196
216
197
func writeString (socket * websocket.Conn , message string ) {
217
- socket .SetWriteDeadline (time .Now ().Add (socketWriteDeadlineWait ))
218
198
socket .WriteMessage (websocket .TextMessage , []byte (message ))
219
199
}
220
200
0 commit comments