Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

## Unreleased
### New features
- Provide a new metric called kindling_k8s_workload_info, which supports workload filtering for k8s, thus preventing frequent crashes of Grafana topology. ([#530](https://github.com/KindlingProject/kindling/pull/530))
- Added support for displaying trace-profiling data by querying from Elasticsearch. ([#528](https://github.com/KindlingProject/kindling/pull/528))
- Display scheduler run queue latency on Trace-Profiling chart. To learn more about the concept of 'Run Queue Latency', refer to [this blog post](https://www.brendangregg.com/blog/2016-10-08/linux-bcc-runqlat.html). You can also find a use case for this feature in [this blog post](http://kindling.harmonycloud.cn/blogs/use-cases/optimize-cpu/). ([#494](https://github.com/KindlingProject/kindling/pull/494))
### Enhancements
Expand Down Expand Up @@ -178,3 +179,5 @@ In this release, we have a new contributor @llhhbc. Thanks and welcome! 🥳
- Support HTTP, MySQL, and REDIS request analysis.
- Provide a Grafana-plugin with four built-in dashboards to support basic analysis features.



5 changes: 5 additions & 0 deletions collector/docker/kindling-collector-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ analyzers:
- key: "rocketmq"
ports: [ 9876, 10911 ]
slow_threshold: 500
k8sinfoanalyzer:
# SendDataGroupInterval is the datagroup sending interval.
# The unit is seconds.
sampling_interval: 15

processors:
k8smetadataprocessor:
Expand Down Expand Up @@ -192,6 +196,7 @@ exporters:
kindling_tcp_packet_loss_total: counter
kindling_tcp_connect_total: counter
kindling_tcp_connect_duration_nanoseconds_total: counter
kindling_k8s_workload_info: gauge
# Export data in the following ways: ["prometheus", "otlp", "stdout"]
# Note: configure the corresponding section to make everything ok
export_kind: prometheus
Expand Down
11 changes: 7 additions & 4 deletions collector/internal/application/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ import (
"github.com/Kindling-project/kindling/collector/pkg/component"
"github.com/Kindling-project/kindling/collector/pkg/component/analyzer"
"github.com/Kindling-project/kindling/collector/pkg/component/analyzer/cpuanalyzer"
"github.com/Kindling-project/kindling/collector/pkg/component/analyzer/loganalyzer"
"github.com/Kindling-project/kindling/collector/pkg/component/analyzer/k8sinfoanalyzer"
"github.com/Kindling-project/kindling/collector/pkg/component/analyzer/network"
"github.com/Kindling-project/kindling/collector/pkg/component/analyzer/noopanalyzer"
"github.com/Kindling-project/kindling/collector/pkg/component/analyzer/tcpconnectanalyzer"
"github.com/Kindling-project/kindling/collector/pkg/component/analyzer/tcpmetricanalyzer"
"github.com/Kindling-project/kindling/collector/pkg/component/consumer"
Expand Down Expand Up @@ -82,7 +83,8 @@ func (a *Application) registerFactory() {
a.componentsFactory.RegisterExporter(otelexporter.Otel, otelexporter.NewExporter, &otelexporter.Config{})
a.componentsFactory.RegisterAnalyzer(tcpmetricanalyzer.TcpMetric.String(), tcpmetricanalyzer.NewTcpMetricAnalyzer, &tcpmetricanalyzer.Config{})
a.componentsFactory.RegisterExporter(logexporter.Type, logexporter.New, &logexporter.Config{})
a.componentsFactory.RegisterAnalyzer(loganalyzer.Type.String(), loganalyzer.New, &loganalyzer.Config{})
a.componentsFactory.RegisterAnalyzer(noopanalyzer.Type.String(), noopanalyzer.New, &noopanalyzer.Config{})
a.componentsFactory.RegisterAnalyzer(k8sinfoanalyzer.Type.String(), k8sinfoanalyzer.New, k8sinfoanalyzer.NewDefaultConfig())
a.componentsFactory.RegisterProcessor(aggregateprocessor.Type, aggregateprocessor.New, aggregateprocessor.NewDefaultConfig())
a.componentsFactory.RegisterAnalyzer(tcpconnectanalyzer.Type.String(), tcpconnectanalyzer.New, tcpconnectanalyzer.NewDefaultConfig())
a.componentsFactory.RegisterExporter(cameraexporter.Type, cameraexporter.New, cameraexporter.NewDefaultConfig())
Expand Down Expand Up @@ -134,9 +136,10 @@ func (a *Application) buildPipeline() error {

cpuAnalyzerFactory := a.componentsFactory.Analyzers[cpuanalyzer.CpuProfile.String()]
cpuAnalyzer := cpuAnalyzerFactory.NewFunc(cpuAnalyzerFactory.Config, a.telemetry.GetTelemetryTools(cpuanalyzer.CpuProfile.String()), []consumer.Consumer{cameraExporter})

k8sInfoAnalyzerFactory := a.componentsFactory.Analyzers[k8sinfoanalyzer.Type.String()]
k8sInfoAnalyzer := k8sInfoAnalyzerFactory.NewFunc(k8sInfoAnalyzerFactory.Config, a.telemetry.GetTelemetryTools(k8sinfoanalyzer.Type.String()), []consumer.Consumer{otelExporter})
// Initialize receiver packaged with multiple analyzers
analyzerManager, err := analyzer.NewManager(networkAnalyzer, tcpAnalyzer, tcpConnectAnalyzer, cpuAnalyzer)
analyzerManager, err := analyzer.NewManager(networkAnalyzer, tcpAnalyzer, tcpConnectAnalyzer, cpuAnalyzer, k8sInfoAnalyzer)
if err != nil {
return fmt.Errorf("error happened while creating analyzer manager: %w", err)
}
Expand Down
32 changes: 32 additions & 0 deletions collector/pkg/component/analyzer/k8sinfoanalyzer/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# K8sInfo Analyzer

K8sInfoAnalyzer is a component that analyzes k8s workload infomation received, receive `model.DataGroup` from metadata
and send to exporter
## Configuration
See [config.go](./config.go) for the config specification.

## Received Data (Input)

The `DataGroup` contains the following fields:
- `Name` is always `k8s_workload_metric_group`.
- `Lables` contains the following fields:
- `namespace`: The namespace of the workload.
- `workload_name `: The name of the workload.
- `workload_kind`: The kind of the workload.
- `Metrics` contains the following fields:
- `kindling_k8s_workload_info`

An example is as follows.
```json
{
"Name": "k8s_workload_metric_group",
"Metrics":{
"kindling_k8s_workload_info": 1
}
"Labels": {
"namespace": "kindling",
"workload_kind": "Deployment",
"workload_name": "testdemo1",
}
}
```
13 changes: 13 additions & 0 deletions collector/pkg/component/analyzer/k8sinfoanalyzer/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package k8sinfoanalyzer

type Config struct {
// SendDataGroupInterval is the datagroup sending interval.
// The unit is seconds.
SendDataGroupInterval int `mapstructure:"send_datagroup_interval"`
}

func NewDefaultConfig() *Config {
return &Config{
SendDataGroupInterval: 15,
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package k8sinfoanalyzer

import (
"time"

"github.com/Kindling-project/kindling/collector/pkg/component"
"github.com/Kindling-project/kindling/collector/pkg/component/analyzer"
"github.com/Kindling-project/kindling/collector/pkg/component/consumer"
"github.com/Kindling-project/kindling/collector/pkg/metadata/kubernetes"
"github.com/Kindling-project/kindling/collector/pkg/model"
"github.com/Kindling-project/kindling/collector/pkg/model/constlabels"
"go.uber.org/zap/zapcore"
)

const Type analyzer.Type = "k8sinfoanalyzer"

type K8sInfoAnalyzer struct {
cfg *Config
nextConsumers []consumer.Consumer
telemetry *component.TelemetryTools
stopProfileChan chan struct{}
}

func New(cfg interface{}, telemetry *component.TelemetryTools, consumer []consumer.Consumer) analyzer.Analyzer {
config, ok := cfg.(*Config)
if !ok {
telemetry.Logger.Panic("Cannot convert k8sinfoanalyzer config")
}
return &K8sInfoAnalyzer{
cfg: config,
nextConsumers: consumer,
telemetry: telemetry,
}
}

func (a *K8sInfoAnalyzer) sendToNextConsumer() {
timer := time.NewTicker(time.Duration(a.cfg.SendDataGroupInterval) * time.Second)
for {
select {
case <-a.stopProfileChan:
return
case <-timer.C:
func() {
dataGroups := kubernetes.GetWorkloadDataGroup()
for _, nextConsumer := range a.nextConsumers {
for _, dataGroup := range dataGroups {
nextConsumer.Consume(dataGroup)
if ce := a.telemetry.Logger.Check(zapcore.DebugLevel, ""); ce != nil {
a.telemetry.Logger.Debug("K8sInfoAnalyzer send to consumer workload name=:\n" +
dataGroup.Labels.GetStringValue(constlabels.WorkloadName))
}
}
}
}()
}
}
}

func (a *K8sInfoAnalyzer) Start() error {
a.stopProfileChan = make(chan struct{})
go a.sendToNextConsumer()
return nil
}

func (a *K8sInfoAnalyzer) ConsumeEvent(event *model.KindlingEvent) error {
return nil
}

func (a *K8sInfoAnalyzer) Shutdown() error {
close(a.stopProfileChan)
return nil
}

func (a *K8sInfoAnalyzer) Type() analyzer.Type {
return Type
}

func (a *K8sInfoAnalyzer) ConsumableEvents() []string {
return nil
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package loganalyzer
package noopanalyzer

import (
"fmt"
Expand All @@ -10,9 +10,9 @@ import (
"go.uber.org/zap/zapcore"
)

const Type analyzer.Type = "loganalyzer"
const Type analyzer.Type = "noopanalyzer"

type LogAnalyzer struct {
type NoopAnalyzer struct {
cfg *Config
nextConsumers []consumer.Consumer
telemetry *component.TelemetryTools
Expand All @@ -21,20 +21,20 @@ type LogAnalyzer struct {
func New(cfg interface{}, telemetry *component.TelemetryTools, consumer []consumer.Consumer) analyzer.Analyzer {
config, ok := cfg.(*Config)
if !ok {
telemetry.Logger.Panic("Cannot convert loganalyzer config")
telemetry.Logger.Panic("Cannot convert noopanalyzer config")
}
return &LogAnalyzer{
return &NoopAnalyzer{
cfg: config,
nextConsumers: consumer,
telemetry: telemetry,
}
}

func (a *LogAnalyzer) Start() error {
func (a *NoopAnalyzer) Start() error {
return nil
}

func (a *LogAnalyzer) ConsumeEvent(event *model.KindlingEvent) error {
func (a *NoopAnalyzer) ConsumeEvent(event *model.KindlingEvent) error {
if ce := a.telemetry.Logger.Check(zapcore.InfoLevel, ""); ce != nil {
a.telemetry.Logger.Debug(fmt.Sprintf("Receive event: %+v", event))
}
Expand All @@ -44,15 +44,15 @@ func (a *LogAnalyzer) ConsumeEvent(event *model.KindlingEvent) error {
return nil
}

func (a *LogAnalyzer) Shutdown() error {
func (a *NoopAnalyzer) Shutdown() error {
return nil
}

func (a *LogAnalyzer) Type() analyzer.Type {
func (a *NoopAnalyzer) Type() analyzer.Type {
return Type
}

func (a *LogAnalyzer) ConsumableEvents() []string {
func (a *NoopAnalyzer) ConsumableEvents() []string {
return []string{analyzer.ConsumeAllEvents}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type instrumentFactory struct {

traceAsMetricSelector *aggregator.LabelSelectors
TcpRttMillsSelector *aggregator.LabelSelectors
K8sWorkloadSelector *aggregator.LabelSelectors
}

func newInstrumentFactory(meter metric.Meter, telemetry *component.TelemetryTools, customLabels []attribute.KeyValue) *instrumentFactory {
Expand All @@ -47,11 +48,15 @@ func newInstrumentFactory(meter metric.Meter, telemetry *component.TelemetryTool
constnames.TraceAsMetric: {
{Kind: defaultaggregator.LastKind, OutputName: constnames.TraceAsMetric},
},
constnames.K8sWorkLoadMetricName: {
{Kind: defaultaggregator.LastKind, OutputName: constnames.K8sWorkLoadMetricName},
},
},
}),

traceAsMetricSelector: newTraceAsMetricSelectors(),
TcpRttMillsSelector: newTcpRttMicroSecondsSelectors(),
K8sWorkloadSelector: newK8sWorkloadSelector(),
}
}
func (i *instrumentFactory) getInstrument(metricName string, kind MetricAggregationKind) instrument {
Expand Down Expand Up @@ -119,6 +124,8 @@ func (i *instrumentFactory) getSelector(metricName string) *aggregator.LabelSele
return i.traceAsMetricSelector
case constnames.TcpRttMetricName:
return i.TcpRttMillsSelector
case constnames.K8sWorkLoadMetricName:
return i.K8sWorkloadSelector
default:
return nil
}
Expand Down Expand Up @@ -190,6 +197,14 @@ func newTcpRttMicroSecondsSelectors() *aggregator.LabelSelectors {
)
}

func newK8sWorkloadSelector() *aggregator.LabelSelectors {
return aggregator.NewLabelSelectors(
aggregator.LabelSelector{Name: constlabels.Namespace, VType: aggregator.StringType},
aggregator.LabelSelector{Name: constlabels.WorkloadKind, VType: aggregator.StringType},
aggregator.LabelSelector{Name: constlabels.WorkloadName, VType: aggregator.StringType},
)
}

type instrument interface {
Measurement(value int64) metric.Measurement
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ func NewExporter(config interface{}, telemetry *component.TelemetryTools) export
StoreExternalSrcIP: cfg.AdapterConfig.StoreExternalSrcIP,
}),
adapter.NewSimpleAdapter([]string{constnames.TcpRttMetricGroupName, constnames.TcpRetransmitMetricGroupName,
constnames.TcpDropMetricGroupName, constnames.TcpConnectMetricGroupName}, customLabels),
constnames.TcpDropMetricGroupName, constnames.TcpConnectMetricGroupName, constnames.K8sWorkloadMetricGroupName},
customLabels),
},
}
go func() {
Expand Down Expand Up @@ -211,7 +212,8 @@ func NewExporter(config interface{}, telemetry *component.TelemetryTools) export
StoreExternalSrcIP: cfg.AdapterConfig.StoreExternalSrcIP,
}),
adapter.NewSimpleAdapter([]string{constnames.TcpRttMetricGroupName, constnames.TcpRetransmitMetricGroupName,
constnames.TcpDropMetricGroupName, constnames.TcpConnectMetricGroupName}, customLabels),
constnames.TcpDropMetricGroupName, constnames.TcpConnectMetricGroupName, constnames.K8sWorkloadMetricGroupName},
customLabels),
},
}

Expand Down
5 changes: 4 additions & 1 deletion collector/pkg/metadata/kubernetes/pod_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,10 @@ func podDeleteLoop(interval time.Duration, gracePeriod time.Duration, stopCh cha

func deletePodInfo(podInfo *deletedPodInfo) {
if podInfo.name != "" {
globalPodInfo.delete(podInfo.namespace, podInfo.name)
deletePodInfo, ok := globalPodInfo.delete(podInfo.namespace, podInfo.name)
if ok {
globalWorkload.delete(deletePodInfo.Namespace, deletePodInfo.WorkloadName)
}
}
if len(podInfo.containerIds) != 0 {
for i := 0; i < len(podInfo.containerIds); i++ {
Expand Down
Loading