Skip to content

Commit c8ee7fd

Browse files
vishalbolludeliahu
authored andcommitted
Merge kubectl logs (#227)
1 parent 4a7032f commit c8ee7fd

File tree

5 files changed

+219
-60
lines changed

5 files changed

+219
-60
lines changed

cli/cmd/get.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -542,6 +542,10 @@ func dataResourceTable(resources []context.Resource, dataStatuses map[string]*re
542542
func apiResourceTable(apiGroupStatuses map[string]*resource.APIGroupStatus) string {
543543
rows := make([][]interface{}, 0, len(apiGroupStatuses))
544544
for name, groupStatus := range apiGroupStatuses {
545+
if groupStatus.Requested == 0 {
546+
continue
547+
}
548+
545549
var updatedAt *time.Time
546550
if groupStatus.ActiveStatus != nil {
547551
updatedAt = groupStatus.ActiveStatus.Start

pkg/operator/workloads/logs.go

Lines changed: 202 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ package workloads
1818

1919
import (
2020
"bufio"
21+
"bytes"
22+
"encoding/json"
2123
"fmt"
2224
"io"
2325
"os"
@@ -26,19 +28,22 @@ import (
2628
"time"
2729

2830
"github.com/gorilla/websocket"
29-
3031
kcore "k8s.io/api/core/v1"
3132

3233
"github.com/cortexlabs/cortex/pkg/lib/errors"
3334
"github.com/cortexlabs/cortex/pkg/lib/k8s"
3435
"github.com/cortexlabs/cortex/pkg/lib/pointer"
36+
"github.com/cortexlabs/cortex/pkg/lib/sets/strset"
3537
"github.com/cortexlabs/cortex/pkg/operator/config"
3638
)
3739

3840
const (
39-
writeWait = 10 * time.Second
40-
closeGracePeriod = 10 * time.Second
41-
maxMessageSize = 8192
41+
writeWait = 10 * time.Second
42+
closeGracePeriod = 10 * time.Second
43+
maxMessageSize = 8192
44+
podCheckInterval = 5 * time.Second
45+
maxParallelPodLogging = 5
46+
initLogTailLines = 20
4247
)
4348

4449
func ReadLogs(appName string, workloadID string, verbose bool, socket *websocket.Conn) {
@@ -56,40 +61,46 @@ func ReadLogs(appName string, workloadID string, verbose bool, socket *websocket
5661
}
5762

5863
if len(pods) > 0 {
59-
if len(pods) > 1 {
60-
if !writeSocket(fmt.Sprintf("%d pods available, streaming logs for one of them:", len(pods)), socket) {
64+
if len(pods) > maxParallelPodLogging {
65+
if !writeSocket(fmt.Sprintf("\n%d pods available, streaming logs for %d of them:", len(pods), maxParallelPodLogging), socket) {
6166
return
6267
}
6368
}
6469

6570
podMap := make(map[k8s.PodStatus][]kcore.Pod)
6671
for _, pod := range pods {
67-
podMap[k8s.GetPodStatus(&pod)] = append(podMap[k8s.GetPodStatus(&pod)], pod)
72+
podStatus := k8s.GetPodStatus(&pod)
73+
if len(podMap[podStatus]) < maxParallelPodLogging {
74+
podMap[podStatus] = append(podMap[podStatus], pod)
75+
}
6876
}
6977

7078
switch {
7179
case len(podMap[k8s.PodStatusSucceeded]) > 0:
72-
getKubectlLogs(&podMap[k8s.PodStatusSucceeded][0], verbose, wrotePending, false, socket)
80+
getKubectlLogs(podMap[k8s.PodStatusSucceeded], verbose, wrotePending, false, socket)
7381
case len(podMap[k8s.PodStatusRunning]) > 0:
74-
getKubectlLogs(&podMap[k8s.PodStatusRunning][0], verbose, wrotePending, false, socket)
82+
getKubectlLogs(podMap[k8s.PodStatusRunning], verbose, wrotePending, false, socket)
7583
case len(podMap[k8s.PodStatusPending]) > 0:
76-
getKubectlLogs(&podMap[k8s.PodStatusPending][0], verbose, wrotePending, false, socket)
84+
getKubectlLogs(podMap[k8s.PodStatusPending], verbose, wrotePending, false, socket)
7785
case len(podMap[k8s.PodStatusKilled]) > 0:
78-
getKubectlLogs(&podMap[k8s.PodStatusKilled][0], verbose, wrotePending, false, socket)
86+
getKubectlLogs(podMap[k8s.PodStatusKilled], verbose, wrotePending, false, socket)
7987
case len(podMap[k8s.PodStatusKilledOOM]) > 0:
80-
getKubectlLogs(&podMap[k8s.PodStatusKilledOOM][0], verbose, wrotePending, false, socket)
88+
getKubectlLogs(podMap[k8s.PodStatusKilledOOM], verbose, wrotePending, false, socket)
8189
case len(podMap[k8s.PodStatusFailed]) > 0:
8290
previous := false
8391
if pods[0].Labels["workloadType"] == WorkloadTypeAPI {
8492
previous = true
8593
}
86-
getKubectlLogs(&podMap[k8s.PodStatusFailed][0], verbose, wrotePending, previous, socket)
94+
getKubectlLogs(podMap[k8s.PodStatusFailed], verbose, wrotePending, previous, socket)
8795
case len(podMap[k8s.PodStatusTerminating]) > 0:
88-
getKubectlLogs(&podMap[k8s.PodStatusTerminating][0], verbose, wrotePending, false, socket)
96+
getKubectlLogs(podMap[k8s.PodStatusTerminating], verbose, wrotePending, false, socket)
8997
case len(podMap[k8s.PodStatusUnknown]) > 0:
90-
getKubectlLogs(&podMap[k8s.PodStatusUnknown][0], verbose, wrotePending, false, socket)
98+
getKubectlLogs(podMap[k8s.PodStatusUnknown], verbose, wrotePending, false, socket)
9199
default: // unexpected
92-
getKubectlLogs(&pods[0], verbose, wrotePending, false, socket)
100+
if len(pods) > maxParallelPodLogging {
101+
pods = pods[:maxParallelPodLogging]
102+
}
103+
getKubectlLogs(pods, verbose, wrotePending, false, socket)
93104
}
94105
return
95106
}
@@ -118,7 +129,7 @@ func ReadLogs(appName string, workloadID string, verbose bool, socket *websocket
118129
if !writeSocket("\nFailed to start:\n", socket) {
119130
return
120131
}
121-
getKubectlLogs(failedArgoPod, true, false, false, socket)
132+
getKubectlLogs([]kcore.Pod{*failedArgoPod}, true, false, false, socket)
122133
return
123134
}
124135

@@ -133,52 +144,169 @@ func ReadLogs(appName string, workloadID string, verbose bool, socket *websocket
133144
}
134145
}
135146

136-
func getKubectlLogs(pod *kcore.Pod, verbose bool, wrotePending bool, previous bool, socket *websocket.Conn) {
137-
cmdPath := "/usr/local/bin/kubectl"
147+
func getKubectlLogs(pods []kcore.Pod, verbose bool, wrotePending bool, previous bool, socket *websocket.Conn) {
148+
isAllPending := true
149+
for _, pod := range pods {
150+
if k8s.GetPodStatus(&pod) != k8s.PodStatusPending {
151+
isAllPending = false
152+
break
153+
}
154+
}
138155

139-
if k8s.GetPodStatus(pod) == k8s.PodStatusPending {
140-
if !wrotePending {
141-
if !writeSocket("\nPending", socket) {
142-
return
143-
}
156+
if isAllPending {
157+
if !writeSocket("\nPending", socket) {
158+
return
144159
}
145-
config.Kubernetes.WaitForPodRunning(pod.Name, 1)
146160
}
147161

148-
args := []string{"kubectl", "-n=" + config.Cortex.Namespace, "logs", "--follow=true"}
162+
inr, inw, err := os.Pipe()
163+
if err != nil {
164+
errors.Panic(err, "logs", "kubectl", "os.pipe")
165+
}
166+
defer inw.Close()
167+
defer inr.Close()
168+
169+
podCheckCancel := make(chan struct{})
170+
defer close(podCheckCancel)
171+
172+
go podCheck(podCheckCancel, socket, pods, previous, verbose, inr)
173+
pumpStdin(socket, inw)
174+
podCheckCancel <- struct{}{}
175+
}
176+
177+
func startKubectlProcess(pod kcore.Pod, previous bool, attrs *os.ProcAttr) (*os.Process, error) {
178+
cmdPath := "/bin/bash"
179+
180+
kubectlArgs := []string{"kubectl", "-n=" + config.Cortex.Namespace, "logs", "--follow=true"}
149181
if previous {
150-
args = append(args, "--previous")
182+
kubectlArgs = append(kubectlArgs, "--previous")
151183
}
152184

153-
args = append(args, pod.Name)
185+
identifier := pod.Name
186+
kubectlArgs = append(kubectlArgs, pod.Name)
154187
if pod.Labels["workloadType"] == WorkloadTypeAPI && pod.Labels["userFacing"] == "true" {
155-
args = append(args, apiContainerName)
188+
kubectlArgs = append(kubectlArgs, apiContainerName)
189+
kubectlArgs = append(kubectlArgs, fmt.Sprintf("--tail=%d", initLogTailLines))
190+
identifier += " " + apiContainerName
156191
}
157192

158-
outr, outw, err := os.Pipe()
193+
labelLog := fmt.Sprintf(" | while read -r; do echo \"[%s] $REPLY \" | tail -n +1; done", identifier)
194+
kubectlCmd := strings.Join(kubectlArgs, " ")
195+
bashArgs := []string{"/bin/bash", "-c", kubectlCmd + labelLog}
196+
process, err := os.StartProcess(cmdPath, bashArgs, attrs)
159197
if err != nil {
160-
errors.Panic(err, "logs", "kubectl", "os.pipe")
198+
return nil, errors.Wrap(err, strings.Join(bashArgs, " "))
161199
}
162-
defer outr.Close()
163-
defer outw.Close()
164200

165-
inr, inw, err := os.Pipe()
201+
return process, nil
202+
}
203+
204+
func podCheck(podCheckCancel chan struct{}, socket *websocket.Conn, initialPodList []kcore.Pod, previous bool, verbose bool, inr *os.File) {
205+
timer := time.NewTimer(0)
206+
defer timer.Stop()
207+
208+
processMap := make(map[string]*os.Process)
209+
defer deleteProcesses(processMap)
210+
labels := initialPodList[0].GetLabels()
211+
appName := labels["appName"]
212+
workloadID := labels["workloadID"]
213+
214+
outr, outw, err := os.Pipe()
166215
if err != nil {
167216
errors.Panic(err, "logs", "kubectl", "os.pipe")
168217
}
169-
defer inr.Close()
170-
defer inw.Close()
218+
defer outw.Close()
219+
defer outr.Close()
171220

172-
process, err := os.StartProcess(cmdPath, args, &os.ProcAttr{
173-
Files: []*os.File{inr, outw, outw},
174-
})
175-
if err != nil {
176-
errors.Panic(err, strings.Join(args, " "))
221+
socketWriterError := make(chan error, 1)
222+
defer close(socketWriterError)
223+
224+
go pumpStdout(socket, socketWriterError, outr, verbose, true)
225+
226+
for {
227+
select {
228+
case <-podCheckCancel:
229+
return
230+
case <-timer.C:
231+
pods, err := config.Kubernetes.ListPodsByLabels(map[string]string{
232+
"appName": appName,
233+
"workloadID": workloadID,
234+
"userFacing": "true",
235+
})
236+
237+
if err != nil {
238+
socketWriterError <- errors.Wrap(err, "pod check")
239+
return
240+
}
241+
242+
latestRunningPodsMap := make(map[string]kcore.Pod)
243+
latestRunningPods := strset.New()
244+
for _, pod := range pods {
245+
if k8s.GetPodStatus(&pod) != k8s.PodStatusPending {
246+
latestRunningPods.Add(pod.GetName())
247+
latestRunningPodsMap[pod.GetName()] = pod
248+
}
249+
}
250+
251+
prevRunningPods := strset.New()
252+
for podName := range processMap {
253+
prevRunningPods.Add(podName)
254+
}
255+
256+
newPods := strset.Difference(latestRunningPods, prevRunningPods)
257+
podsToDelete := strset.Difference(prevRunningPods, latestRunningPods)
258+
podsToKeep := strset.Intersection(prevRunningPods, latestRunningPods)
259+
260+
// Prioritize adding running pods
261+
podsToAddRunning := []string{}
262+
podsToAddNotRunning := []string{}
263+
264+
for podName := range newPods {
265+
pod := latestRunningPodsMap[podName]
266+
if k8s.GetPodStatus(&pod) == k8s.PodStatusRunning {
267+
podsToAddRunning = append(podsToAddRunning, podName)
268+
} else {
269+
podsToAddNotRunning = append(podsToAddNotRunning, podName)
270+
}
271+
}
272+
podsToAdd := append(podsToAddRunning, podsToAddNotRunning...)
273+
274+
maxPodsToAdd := maxParallelPodLogging - len(podsToKeep)
275+
if len(podsToAdd) < maxPodsToAdd {
276+
maxPodsToAdd = len(podsToAdd)
277+
}
278+
279+
for _, podName := range podsToAdd[:maxPodsToAdd] {
280+
process, err := startKubectlProcess(latestRunningPodsMap[podName], previous, &os.ProcAttr{
281+
Files: []*os.File{inr, outw, outw},
282+
})
283+
if err != nil {
284+
socketWriterError <- err
285+
return
286+
}
287+
processMap[podName] = process
288+
}
289+
290+
deleteMap := make(map[string]*os.Process, len(podsToDelete))
291+
292+
for podName := range podsToDelete {
293+
deleteMap[podName] = processMap[podName]
294+
delete(processMap, podName)
295+
}
296+
deleteProcesses(deleteMap)
297+
timer.Reset(podCheckInterval)
298+
}
177299
}
300+
}
178301

179-
go pumpStdout(socket, outr, verbose, true)
180-
pumpStdin(socket, inw)
181-
stopProcess(process)
302+
func deleteProcesses(processMap map[string]*os.Process) {
303+
for _, process := range processMap {
304+
process.Signal(os.Interrupt)
305+
}
306+
time.Sleep(5 * time.Second)
307+
for _, process := range processMap {
308+
process.Signal(os.Kill)
309+
}
182310
}
183311

184312
func getCloudWatchLogs(prefix string, verbose bool, socket *websocket.Conn) {
@@ -196,7 +324,10 @@ func getCloudWatchLogs(prefix string, verbose bool, socket *websocket.Conn) {
196324
} else {
197325
logsReader = strings.NewReader(logs)
198326
}
199-
go pumpStdout(socket, logsReader, verbose, false)
327+
328+
socketWriterError := make(chan error)
329+
defer close(socketWriterError)
330+
go pumpStdout(socket, socketWriterError, logsReader, verbose, false)
200331

201332
inr, inw, err := os.Pipe()
202333
if err != nil {
@@ -224,7 +355,7 @@ func pumpStdin(socket *websocket.Conn, writer io.Writer) {
224355
}
225356
}
226357

227-
func pumpStdout(socket *websocket.Conn, reader io.Reader, verbose bool, checkForLastLog bool) {
358+
func pumpStdout(socket *websocket.Conn, socketWriterError chan error, reader io.Reader, verbose bool, checkForLastLog bool) {
228359
scanner := bufio.NewScanner(reader)
229360
for scanner.Scan() {
230361
socket.SetWriteDeadline(time.Now().Add(writeWait))
@@ -243,14 +374,23 @@ func pumpStdout(socket *websocket.Conn, reader io.Reader, verbose bool, checkFor
243374
}
244375
}
245376

377+
select {
378+
case err := <-socketWriterError:
379+
if err != nil {
380+
writeSocket(err.Error(), socket)
381+
}
382+
default:
383+
}
384+
246385
socket.SetWriteDeadline(time.Now().Add(writeWait))
247386
socket.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
248387
time.Sleep(closeGracePeriod)
249388
socket.Close()
250389
}
251390

252-
var cortexRegex = regexp.MustCompile(`^?(DEBUG|INFO|WARNING|ERROR|CRITICAL):cortex:`)
253-
var tensorflowRegex = regexp.MustCompile(`^?(DEBUG|INFO|WARNING|ERROR|CRITICAL):tensorflow:`)
391+
var cortexRegex = regexp.MustCompile(`^\[.*\]?(DEBUG|INFO|WARNING|ERROR|CRITICAL):cortex:`)
392+
var tensorflowRegex = regexp.MustCompile(`^\[.*\]?(DEBUG|INFO|WARNING|ERROR|CRITICAL):tensorflow:`)
393+
var jsonPrefixRegex = regexp.MustCompile(`^\ *?(\{|\[)`)
254394

255395
func formatHeader1(headerString string) *string {
256396
headerBorder := "\n" + strings.Repeat("-", len(headerString)) + "\n"
@@ -299,6 +439,20 @@ func extractFromCortexLog(match string, loglevel string, logStr string) (*string
299439
return formatHeader3(cutStr), false
300440
}
301441

442+
matches := jsonPrefixRegex.FindStringSubmatch(cutStr)
443+
if len(matches) == 2 {
444+
indentIndex := len(matches[0]) - 1 // matches to curly or square bracket so remove the last char
445+
indentStr := cutStr[:indentIndex]
446+
maybeJSON := cutStr[indentIndex:]
447+
jsonBytes := []byte(maybeJSON)
448+
var obytes bytes.Buffer
449+
err := json.Indent(&obytes, jsonBytes, indentStr, " ")
450+
if err == nil {
451+
ostr := indentStr + string(obytes.String())
452+
return &ostr, false
453+
}
454+
}
455+
302456
lastLogRe := regexp.MustCompile(`^workload: (\w+), completed: (\S+)`)
303457
if lastLogRe.MatchString(cutStr) {
304458
return &cutStr, true

pkg/workloads/cortex/lib/util.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ def log_pretty(obj, indent=0, logging_func=logger.info):
118118

119119

120120
def log_pretty_flat(obj, indent=0, logging_func=logger.info):
121-
logging_func(pp_str_flat(obj), indent)
121+
logging_func(pp_str_flat(obj, indent))
122122

123123

124124
def pluralize(num, singular, plural):

0 commit comments

Comments
 (0)