Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,13 @@
1. All notable changes to this project will be documented in this file.
2. Records in this file are not identical to the title of their Pull Requests. A detailed description is necessary for understanding what changes are and why they are made.

## Unreleased
### Enhancements
- Add an option `enable_fetch_replicaset` to control whether to fetch ReplicaSet metadata. The default value is false which aims to release pressure on Kubernetes API server. ([#492](https://github.com/KindlingProject/kindling/pull/492))

### Bug fixes


## v0.7.1 - 2023-03-01
### New features
- Support trace-profiling sampling to reduce data output. One trace is sampled every five seconds for each endpoint by default. ([#446](https://github.com/KindlingProject/kindling/pull/446)[#462](https://github.com/KindlingProject/kindling/pull/462))
Expand Down
7 changes: 7 additions & 0 deletions collector/docker/kindling-collector-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,14 @@ processors:
enable: true
kube_auth_type: kubeConfig
kube_config_dir: /root/.kube/config
# GraceDeletePeriod controls the delay interval after receiving delete event.
# The unit is seconds, and the default value is 60 seconds.
# Should not be lower than 30 seconds.
grace_delete_period: 60
# enable_fetch_replicaset controls whether to fetch ReplicaSet information.
# The default value is false. It should be enabled if the ReplicaSet
# is used to control pods in the third-party CRD except for Deployment.
enable_fetch_replicaset: false
aggregateprocessor:
# Aggregation duration window size. The unit is second.
ticker_interval: 5
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ type Config struct {
// The unit is seconds, and the default value is 60 seconds.
// Should not be lower than 30 seconds.
GraceDeletePeriod int `mapstructure:"grace_delete_period"`
// EnableFetchReplicaSet controls whether to fetch ReplicaSet information.
// The default value is false. It should be enabled if the ReplicaSet
// is used to control pods in the third-party CRD except for Deployment.
EnableFetchReplicaSet bool `mapstructure:"enable_fetch_replicaset"`
// Set "Enable" false if you want to run the agent in the non-Kubernetes environment.
// Otherwise, the agent will panic if it can't connect to the API-server.
Enable bool `mapstructure:"enable"`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,12 @@ func NewKubernetesProcessor(cfg interface{}, telemetry *component.TelemetryTools
}
}
var options []kubernetes.Option
options = append(options, kubernetes.WithAuthType(config.KubeAuthType))
options = append(options, kubernetes.WithKubeConfigDir(config.KubeConfigDir))
options = append(options, kubernetes.WithGraceDeletePeriod(config.GraceDeletePeriod))
options = append(options,
kubernetes.WithAuthType(config.KubeAuthType),
kubernetes.WithKubeConfigDir(config.KubeConfigDir),
kubernetes.WithGraceDeletePeriod(config.GraceDeletePeriod),
kubernetes.WithFetchReplicaSet(config.EnableFetchReplicaSet),
)
err := kubernetes.InitK8sHandler(options...)
if err != nil {
telemetry.Logger.Panicf("Failed to initialize [%s]: %v. Set the option 'enable' false if you want to run the agent in the non-Kubernetes environment.", K8sMetadata, err)
Expand Down
11 changes: 11 additions & 0 deletions collector/pkg/metadata/kubernetes/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ type config struct {
// The unit is seconds, and the default value is 60 seconds.
// Should not be lower than 30 seconds.
GraceDeletePeriod time.Duration
// EnableFetchReplicaSet controls whether to fetch ReplicaSet information.
// The default value is false. It should be enabled if the ReplicaSet
// is used to control pods in the third-party CRD except for Deployment.
EnableFetchReplicaSet bool
}

type Option func(cfg *config)
Expand All @@ -36,3 +40,10 @@ func WithGraceDeletePeriod(interval int) Option {
cfg.GraceDeletePeriod = time.Duration(interval) * time.Second
}
}

// WithFetchReplicaSet sets whether to fetch ReplicaSet information.
func WithFetchReplicaSet(fetch bool) Option {
return func(cfg *config) {
cfg.EnableFetchReplicaSet = fetch
}
}
2 changes: 2 additions & 0 deletions collector/pkg/metadata/kubernetes/helper.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package kubernetes

const DeploymentKind = "deployment"

// CompleteGVK returns the complete string of the workload kind.
// If apiVersion is not one of the built-in groupVersion(see scheme.go), return {apiVersion}-{kind};
// return {kind}, otherwise.
Expand Down
21 changes: 11 additions & 10 deletions collector/pkg/metadata/kubernetes/initk8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ const (
AuthTypeServiceAccount AuthType = "serviceAccount"
// AuthTypeKubeConfig uses local credentials like those used by kubectl.
AuthTypeKubeConfig AuthType = "kubeConfig"
// Default kubeconfig path
// DefaultKubeConfigPath Default kubeconfig path
DefaultKubeConfigPath string = "~/.kube/config"
// Default grace delete period is 60 seconds
DefaultGraceDeletePeriod time.Duration = time.Second * 60
// DefaultGraceDeletePeriod is 60 seconds
DefaultGraceDeletePeriod = time.Second * 60
)

var authTypes = map[AuthType]bool{
Expand Down Expand Up @@ -57,7 +57,6 @@ func (c APIConfig) Validate() error {

var (
MetaDataCache = New()
KubeClient *k8s.Clientset
once sync.Once
IsInitSuccess = false
)
Expand All @@ -66,9 +65,10 @@ func InitK8sHandler(options ...Option) error {
var retErr error
once.Do(func() {
k8sConfig := config{
KubeAuthType: AuthTypeKubeConfig,
KubeConfigDir: DefaultKubeConfigPath,
GraceDeletePeriod: DefaultGraceDeletePeriod,
KubeAuthType: AuthTypeKubeConfig,
KubeConfigDir: DefaultKubeConfigPath,
GraceDeletePeriod: DefaultGraceDeletePeriod,
EnableFetchReplicaSet: false,
}
for _, option := range options {
option(&k8sConfig)
Expand All @@ -82,13 +82,14 @@ func InitK8sHandler(options ...Option) error {
IsInitSuccess = true
go NodeWatch(clientSet)
time.Sleep(1 * time.Second)
go RsWatch(clientSet)
time.Sleep(1 * time.Second)
if k8sConfig.EnableFetchReplicaSet {
go RsWatch(clientSet)
time.Sleep(1 * time.Second)
}
go ServiceWatch(clientSet)
time.Sleep(1 * time.Second)
go PodWatch(clientSet, k8sConfig.GraceDeletePeriod)
time.Sleep(1 * time.Second)
KubeClient = clientSet
})
return retErr
}
Expand Down
19 changes: 17 additions & 2 deletions collector/pkg/metadata/kubernetes/pod_watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package kubernetes
import (
"fmt"
_ "path/filepath"
"regexp"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -223,17 +224,31 @@ func getControllerKindName(pod *corev1.Pod) (workloadKind string, workloadName s
if workload, ok := globalRsInfo.GetOwnerReference(mapKey(pod.Namespace, owner.Name)); ok {
workloadKind = CompleteGVK(workload.APIVersion, strings.ToLower(workload.Kind))
workloadName = workload.Name
return
} else {
// If not found in 'globalRsInfo', just use 'Deployment' as the workload kind.
workloadKind = DeploymentKind
workloadName = extractDeploymentName(owner.Name)
}
return
}
// If the owner of pod is not ReplicaSet or the replicaset has no controller
// If the owner of pod is not ReplicaSet
workloadKind = CompleteGVK(owner.APIVersion, strings.ToLower(owner.Kind))
workloadName = owner.Name
return
}
return
}

var rRegex = regexp.MustCompile(`^(.*)-[0-9a-z]+$`)

func extractDeploymentName(replicaSetName string) string {
parts := rRegex.FindStringSubmatch(replicaSetName)
if len(parts) == 2 {
return parts[1]
}
return ""
}

func OnUpdate(objOld interface{}, objNew interface{}) {
oldPod := objOld.(*corev1.Pod)
newPod := objNew.(*corev1.Pod)
Expand Down
20 changes: 20 additions & 0 deletions collector/pkg/metadata/kubernetes/pod_watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,3 +338,23 @@ func assertFindPod(t *testing.T, pod *corev1.Pod) {
_, find = MetaDataCache.GetContainerByHostIpPort(pod.Status.HostIP, uint32(pod.Spec.Containers[0].Ports[0].HostPort))
assert.True(t, find, "Didn't find the new HostIP Port in MetaDataCache")
}

func Test_extractDeploymentName(t *testing.T) {
type args struct {
replicaSetName string
}
tests := []struct {
name string
args args
want string
}{
{args: args{"nginx-deployment-75675f5897"}, want: "nginx-deployment"},
{args: args{"nginx-75675f5897"}, want: "nginx"},
{args: args{"nginx75675f5897"}, want: ""},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert.Equalf(t, tt.want, extractDeploymentName(tt.args.replicaSetName), "extractDeploymentName(%v)", tt.args.replicaSetName)
})
}
}
7 changes: 7 additions & 0 deletions deploy/agent/kindling-collector-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,14 @@ processors:
enable: true
kube_auth_type: serviceAccount
kube_config_dir: /root/.kube/config
# GraceDeletePeriod controls the delay interval after receiving delete event.
# The unit is seconds, and the default value is 60 seconds.
# Should not be lower than 30 seconds.
grace_delete_period: 60
# enable_fetch_replicaset controls whether to fetch ReplicaSet information.
# The default value is false. It should be enabled if the ReplicaSet
# is used to control pods in the third-party CRD except for Deployment.
enable_fetch_replicaset: false
aggregateprocessor:
# Aggregation duration window size. The unit is second.
ticker_interval: 5
Expand Down