Skip to content
This repository was archived by the owner on Mar 9, 2022. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 10 additions & 12 deletions pkg/server/container_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,24 +27,22 @@ import (
// ContainerStats returns stats of the container. If the container does not
// exist, the call returns an error.
func (c *criContainerdService) ContainerStats(ctx context.Context, in *runtime.ContainerStatsRequest) (*runtime.ContainerStatsResponse, error) {
// Validate the stats request
if in.GetContainerId() == "" {
return nil, fmt.Errorf("invalid container stats request")
}
containerID := in.GetContainerId()
_, err := c.containerStore.Get(containerID)
cntr, err := c.containerStore.Get(in.GetContainerId())
if err != nil {
return nil, fmt.Errorf("failed to find container %q: %v", containerID, err)
return nil, fmt.Errorf("failed to find container: %v", err)
}
request := &tasks.MetricsRequest{Filters: []string{"id==" + containerID}}
request := &tasks.MetricsRequest{Filters: []string{"id==" + cntr.ID}}
resp, err := c.taskService.Metrics(ctx, request)
if err != nil {
return nil, fmt.Errorf("failed to fetch metrics for tasks: %v", err)
return nil, fmt.Errorf("failed to fetch metrics for task: %v", err)
}
if len(resp.Metrics) != 1 {
return nil, fmt.Errorf("unexpected metrics response: %+v", resp.Metrics)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add containerID

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for taking care of this

}

var cs runtime.ContainerStats
if err := c.getContainerMetrics(containerID, resp.Metrics[0], &cs); err != nil {
cs, err := c.getContainerMetrics(cntr.Metadata, resp.Metrics[0])
if err != nil {
return nil, fmt.Errorf("failed to decode container metrics: %v", err)
}
return &runtime.ContainerStatsResponse{Stats: &cs}, nil
return &runtime.ContainerStatsResponse{Stats: cs}, nil
}
84 changes: 34 additions & 50 deletions pkg/server/container_stats_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,25 +23,26 @@ import (
tasks "github.com/containerd/containerd/api/services/tasks/v1"
"github.com/containerd/containerd/api/types"
"github.com/containerd/typeurl"
"github.com/golang/glog"
"golang.org/x/net/context"
"k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"

containerstore "github.com/kubernetes-incubator/cri-containerd/pkg/store/container"
)

// ListContainerStats returns stats of all running containers.
func (c *criContainerdService) ListContainerStats(
ctx context.Context,
in *runtime.ListContainerStatsRequest,
) (*runtime.ListContainerStatsResponse, error) {
request, candidateContainers, err := c.buildTaskMetricsRequest(in)
request, containers, err := c.buildTaskMetricsRequest(in)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we do truncindex here? There is conainerID in filter.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haven't looked at your PR yet. Will do after the v1.0.0-alpha.0 release, and we could discuss there. :)

if err != nil {
return nil, fmt.Errorf("failed to build metrics request: %v", err)
}
resp, err := c.taskService.Metrics(ctx, &request)
if err != nil {
return nil, fmt.Errorf("failed to fetch metrics for tasks: %v", err)
}
criStats, err := c.toCRIContainerStats(resp.Metrics, candidateContainers)
criStats, err := c.toCRIContainerStats(resp.Metrics, containers)
if err != nil {
return nil, fmt.Errorf("failed to convert to cri containerd stats format: %v", err)
}
Expand All @@ -50,39 +51,30 @@ func (c *criContainerdService) ListContainerStats(

func (c *criContainerdService) toCRIContainerStats(
stats []*types.Metric,
candidateContainers map[string]bool,
containers []containerstore.Container,
) (*runtime.ListContainerStatsResponse, error) {
containerStats := new(runtime.ListContainerStatsResponse)
statsMap := make(map[string]*types.Metric)
for _, stat := range stats {
var cs runtime.ContainerStats
if err := c.getContainerMetrics(stat.ID, stat, &cs); err != nil {
glog.Errorf("failed to decode container metrics: %v", err)
continue
}
delete(candidateContainers, stat.ID)
containerStats.Stats = append(containerStats.Stats, &cs)
statsMap[stat.ID] = stat
}
// If there is a state where containers are dead at the time of query
// but not removed, then check if the writeableLayer information
// is present and attach the same.
for id := range candidateContainers {
var cs runtime.ContainerStats
if err := c.getContainerMetrics(id, nil, &cs); err != nil {
glog.Errorf("failed to decode container metrics: %v", err)
continue
containerStats := new(runtime.ListContainerStatsResponse)
for _, cntr := range containers {
cs, err := c.getContainerMetrics(cntr.Metadata, statsMap[cntr.ID])
if err != nil {
return nil, fmt.Errorf("failed to decode container metrics for %q: %v", cntr.ID, err)
}
containerStats.Stats = append(containerStats.Stats, &cs)
containerStats.Stats = append(containerStats.Stats, cs)
}
return containerStats, nil
}

func (c *criContainerdService) getContainerMetrics(
containerID string,
meta containerstore.Metadata,
stats *types.Metric,
cs *runtime.ContainerStats,
) error {
) (*runtime.ContainerStats, error) {
var cs runtime.ContainerStats
var usedBytes, inodesUsed uint64
sn, err := c.snapshotStore.Get(containerID)
sn, err := c.snapshotStore.Get(meta.ID)
// If snapshotstore doesn't have cached snapshot information
// set WritableLayer usage to zero
if err == nil {
Expand All @@ -97,23 +89,17 @@ func (c *criContainerdService) getContainerMetrics(
UsedBytes: &runtime.UInt64Value{usedBytes},
InodesUsed: &runtime.UInt64Value{inodesUsed},
}

// Get the container from store and extract the attributes
cnt, err := c.containerStore.Get(containerID)
if err != nil {
return fmt.Errorf("failed to find container %q in container store: %v", containerID, err)
}
cs.Attributes = &runtime.ContainerAttributes{
Id: containerID,
Metadata: cnt.Config.GetMetadata(),
Labels: cnt.Config.GetLabels(),
Annotations: cnt.Config.GetAnnotations(),
Id: meta.ID,
Metadata: meta.Config.GetMetadata(),
Labels: meta.Config.GetLabels(),
Annotations: meta.Config.GetAnnotations(),
}

if stats != nil {
s, err := typeurl.UnmarshalAny(stats.Data)
if err != nil {
return fmt.Errorf("failed to extract container metrics: %v", err)
return nil, fmt.Errorf("failed to extract container metrics: %v", err)
}
metrics := s.(*cgroups.Metrics)
cs.Cpu = &runtime.CpuUsage{
Expand All @@ -126,36 +112,34 @@ func (c *criContainerdService) getContainerMetrics(
}
}

return nil
return &cs, nil
}

// buildTaskMetricsRequest constructs a tasks.MetricsRequest based on
// the information in the stats request and the containerStore
func (c *criContainerdService) buildTaskMetricsRequest(
r *runtime.ListContainerStatsRequest,
) (tasks.MetricsRequest, map[string]bool, error) {
) (tasks.MetricsRequest, []containerstore.Container, error) {
var req tasks.MetricsRequest
if r.GetFilter == nil {
if r.GetFilter() == nil {
return req, nil, nil
}

candidateContainers := make(map[string]bool)
for _, c := range c.containerStore.List() {
if r.Filter.GetId() != "" && c.ID != r.Filter.GetId() {
var containers []containerstore.Container
for _, cntr := range c.containerStore.List() {
if r.GetFilter().GetId() != "" && cntr.ID != r.GetFilter().GetId() {
continue
}
if r.Filter.GetPodSandboxId() != "" && c.SandboxID != r.Filter.GetPodSandboxId() {
if r.GetFilter().GetPodSandboxId() != "" && cntr.SandboxID != r.GetFilter().GetPodSandboxId() {
continue
}
if r.Filter.GetLabelSelector() != nil && !matchLabelSelector(r.Filter.GetLabelSelector(), c.Config.GetLabels()) {
if r.GetFilter().GetLabelSelector() != nil &&
!matchLabelSelector(r.GetFilter().GetLabelSelector(), cntr.Config.GetLabels()) {
continue
}
candidateContainers[c.ID] = true
}
for id := range candidateContainers {
req.Filters = append(req.Filters, "id=="+id)
containers = append(containers, cntr)
req.Filters = append(req.Filters, "id=="+cntr.ID)
}
return req, candidateContainers, nil
return req, containers, nil
}

func matchLabelSelector(selector, labels map[string]string) bool {
Expand Down