Skip to content

Commit 328e178

Browse files
authored
Add labels as headers in gRPC connections (#1155)
1 parent a4918b3 commit 328e178

File tree

7 files changed

+92
-26
lines changed

7 files changed

+92
-26
lines changed

internal/command/command_plugin.go

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -106,28 +106,29 @@ func (cp *CommandPlugin) Info() *bus.Info {
106106

107107
func (cp *CommandPlugin) Process(ctx context.Context, msg *bus.Message) {
108108
slog.DebugContext(ctx, "Processing command")
109+
ctxWithMetadata := cp.config.NewContextWithLabels(ctx)
109110

110-
if logger.ServerType(ctx) == "" {
111-
ctx = context.WithValue(
112-
ctx,
111+
if logger.ServerType(ctxWithMetadata) == "" {
112+
ctxWithMetadata = context.WithValue(
113+
ctxWithMetadata,
113114
logger.ServerTypeContextKey, slog.Any(logger.ServerTypeKey, cp.commandServerType.String()),
114115
)
115116
}
116117

117-
if logger.ServerType(ctx) == cp.commandServerType.String() {
118+
if logger.ServerType(ctxWithMetadata) == cp.commandServerType.String() {
118119
switch msg.Topic {
119120
case bus.ConnectionResetTopic:
120-
cp.processConnectionReset(ctx, msg)
121+
cp.processConnectionReset(ctxWithMetadata, msg)
121122
case bus.ResourceUpdateTopic:
122-
cp.processResourceUpdate(ctx, msg)
123+
cp.processResourceUpdate(ctxWithMetadata, msg)
123124
case bus.InstanceHealthTopic:
124-
cp.processInstanceHealth(ctx, msg)
125+
cp.processInstanceHealth(ctxWithMetadata, msg)
125126
case bus.DataPlaneHealthResponseTopic:
126-
cp.processDataPlaneHealth(ctx, msg)
127+
cp.processDataPlaneHealth(ctxWithMetadata, msg)
127128
case bus.DataPlaneResponseTopic:
128-
cp.processDataPlaneResponse(ctx, msg)
129+
cp.processDataPlaneResponse(ctxWithMetadata, msg)
129130
default:
130-
slog.DebugContext(ctx, "Command plugin received unknown topic", "topic", msg.Topic)
131+
slog.DebugContext(ctxWithMetadata, "Command plugin received unknown topic", "topic", msg.Topic)
131132
}
132133
}
133134
}

internal/config/config.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@ func ResolveConfig() (*Config, error) {
129129
}
130130

131131
checkCollectorConfiguration(collector, config)
132+
addLabelsAsOTelHeaders(collector, config.Labels)
132133

133134
slog.Debug("Agent config", "config", config)
134135
slog.Info("Excluded files from being watched for file changes", "exclude_files",
@@ -209,6 +210,22 @@ func defaultCollector(collector *Collector, config *Config) {
209210
}
210211
}
211212

213+
func addLabelsAsOTelHeaders(collector *Collector, labels map[string]any) {
214+
slog.Debug("Adding labels as headers to collector", "labels", labels)
215+
if collector.Extensions.HeadersSetter != nil {
216+
for key, value := range labels {
217+
valueString, ok := value.(string)
218+
if ok {
219+
collector.Extensions.HeadersSetter.Headers = append(collector.Extensions.HeadersSetter.Headers, Header{
220+
Action: "insert",
221+
Key: key,
222+
Value: valueString,
223+
})
224+
}
225+
}
226+
}
227+
}
228+
212229
func setVersion(version, commit string) {
213230
RootCommand.Version = version + "-" + commit
214231
viperInstance.SetDefault(VersionKey, version)

internal/config/config_test.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"errors"
1010
"os"
1111
"path"
12+
"sort"
1213
"strings"
1314
"testing"
1415
"time"
@@ -63,6 +64,10 @@ func TestResolveConfig(t *testing.T) {
6364

6465
actual, err := ResolveConfig()
6566
require.NoError(t, err)
67+
sort.Slice(actual.Collector.Extensions.HeadersSetter.Headers, func(i, j int) bool {
68+
headers := actual.Collector.Extensions.HeadersSetter.Headers
69+
return headers[i].Key < headers[j].Key
70+
})
6671
assert.Equal(t, createConfig(), actual)
6772
}
6873

@@ -1059,6 +1064,16 @@ func createConfig() *Config {
10591064
Key: "key",
10601065
Value: "value",
10611066
},
1067+
{
1068+
Action: "insert",
1069+
Key: "label1",
1070+
Value: "label 1",
1071+
},
1072+
{
1073+
Action: "insert",
1074+
Key: "label2",
1075+
Value: "new-value",
1076+
},
10621077
},
10631078
},
10641079
},

internal/config/types.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,15 @@
66
package config
77

88
import (
9+
"context"
910
"errors"
1011
"fmt"
1112
"path/filepath"
1213
"strings"
1314
"time"
1415

16+
"google.golang.org/grpc/metadata"
17+
1518
"github.com/google/uuid"
1619
)
1720

@@ -432,6 +435,18 @@ func (c *Config) AreReceiversConfigured() bool {
432435
len(c.Collector.Receivers.TcplogReceivers) > 0
433436
}
434437

438+
func (c *Config) NewContextWithLabels(ctx context.Context) context.Context {
439+
md := metadata.Pairs()
440+
for key, value := range c.Labels {
441+
valueString, ok := value.(string)
442+
if ok {
443+
md.Set(key, valueString)
444+
}
445+
}
446+
447+
return metadata.NewOutgoingContext(ctx, md)
448+
}
449+
435450
func isAllowedDir(dir string, allowedDirs []string) bool {
436451
if !strings.HasSuffix(dir, "/") && filepath.Ext(dir) == "" {
437452
dir += "/"

internal/file/file_plugin.go

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -83,34 +83,36 @@ func (fp *FilePlugin) Info() *bus.Info {
8383

8484
// nolint: cyclop, revive
8585
func (fp *FilePlugin) Process(ctx context.Context, msg *bus.Message) {
86+
ctxWithMetadata := fp.config.NewContextWithLabels(ctx)
87+
8688
if logger.ServerType(ctx) == "" {
87-
ctx = context.WithValue(
88-
ctx,
89+
ctxWithMetadata = context.WithValue(
90+
ctxWithMetadata,
8991
logger.ServerTypeContextKey, slog.Any(logger.ServerTypeKey, fp.serverType.String()),
9092
)
9193
}
9294

93-
if logger.ServerType(ctx) == fp.serverType.String() {
95+
if logger.ServerType(ctxWithMetadata) == fp.serverType.String() {
9496
switch msg.Topic {
9597
case bus.ConnectionResetTopic:
96-
fp.handleConnectionReset(ctx, msg)
98+
fp.handleConnectionReset(ctxWithMetadata, msg)
9799
case bus.ConnectionCreatedTopic:
98-
slog.DebugContext(ctx, "File plugin received connection created message")
100+
slog.DebugContext(ctxWithMetadata, "File plugin received connection created message")
99101
fp.fileManagerService.SetIsConnected(true)
100102
case bus.NginxConfigUpdateTopic:
101-
fp.handleNginxConfigUpdate(ctx, msg)
103+
fp.handleNginxConfigUpdate(ctxWithMetadata, msg)
102104
case bus.ConfigUploadRequestTopic:
103-
fp.handleConfigUploadRequest(ctx, msg)
105+
fp.handleConfigUploadRequest(ctxWithMetadata, msg)
104106
case bus.ConfigApplyRequestTopic:
105-
fp.handleConfigApplyRequest(ctx, msg)
107+
fp.handleConfigApplyRequest(ctxWithMetadata, msg)
106108
case bus.ConfigApplyCompleteTopic:
107-
fp.handleConfigApplyComplete(ctx, msg)
109+
fp.handleConfigApplyComplete(ctxWithMetadata, msg)
108110
case bus.ConfigApplySuccessfulTopic:
109-
fp.handleConfigApplySuccess(ctx, msg)
111+
fp.handleConfigApplySuccess(ctxWithMetadata, msg)
110112
case bus.ConfigApplyFailedTopic:
111-
fp.handleConfigApplyFailedRequest(ctx, msg)
113+
fp.handleConfigApplyFailedRequest(ctxWithMetadata, msg)
112114
default:
113-
slog.DebugContext(ctx, "File plugin received unknown topic", "topic", msg.Topic)
115+
slog.DebugContext(ctxWithMetadata, "File plugin received unknown topic", "topic", msg.Topic)
114116
}
115117
}
116118
}

test/mock/collector/nginx-agent.conf

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@ allowed_directories:
2626
- /usr/local/etc/nginx
2727
- /usr/share/nginx/modules
2828
- /var/run/nginx
29+
30+
labels:
31+
product-type: mock-product
32+
product-version: v1.0.0
2933

3034
client:
3135
http:

test/mock/grpc/mock_management_server.go

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,9 @@ var (
5252
Time: keepAliveTime,
5353
Timeout: keepAliveTimeout,
5454
}
55+
56+
errMissingMetadata = status.Errorf(codes.InvalidArgument, "missing metadata")
57+
errInvalidToken = status.Errorf(codes.Unauthenticated, "invalid token")
5558
)
5659

5760
type MockManagementServer struct {
@@ -146,13 +149,15 @@ func serverOptions(agentConfig *config.Config) []grpc.ServerOption {
146149
opts = append(opts, grpc.ChainUnaryInterceptor(
147150
grpcvalidator.UnaryServerInterceptor(),
148151
protovalidateInterceptor.UnaryServerInterceptor(validator),
152+
logHeaders,
149153
),
150154
)
151155
} else {
152156
opts = append(opts, grpc.ChainUnaryInterceptor(
153157
grpcvalidator.UnaryServerInterceptor(),
154158
protovalidateInterceptor.UnaryServerInterceptor(validator),
155159
ensureValidToken,
160+
logHeaders,
156161
),
157162
)
158163
}
@@ -242,10 +247,6 @@ func reportHealth(healthcheck *health.Server, agentConfig *config.Config) {
242247
}
243248

244249
func ensureValidToken(ctx context.Context, req any, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
245-
var (
246-
errMissingMetadata = status.Errorf(codes.InvalidArgument, "missing metadata")
247-
errInvalidToken = status.Errorf(codes.Unauthenticated, "invalid token")
248-
)
249250
md, ok := metadata.FromIncomingContext(ctx)
250251
if !ok {
251252
return nil, errMissingMetadata
@@ -270,3 +271,14 @@ func valid(authorization []string) bool {
270271
// for a token matching an arbitrary string.
271272
return token == "1234"
272273
}
274+
275+
func logHeaders(ctx context.Context, req any, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
276+
md, ok := metadata.FromIncomingContext(ctx)
277+
if !ok {
278+
return nil, errMissingMetadata
279+
}
280+
281+
slog.InfoContext(ctx, "Request headers", "headers", md)
282+
283+
return handler(ctx, req)
284+
}

0 commit comments

Comments
 (0)