Skip to content

Commit 31cb394

Browse files
committed
federated targets functionality (#1375)
Signed-off-by: Alexander Tunik <2braven@gmail.com>
1 parent 03c7747 commit 31cb394

25 files changed

+3032
-17
lines changed

cmd/thanos/query.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ import (
4646
grpcserver "github.com/thanos-io/thanos/pkg/server/grpc"
4747
httpserver "github.com/thanos-io/thanos/pkg/server/http"
4848
"github.com/thanos-io/thanos/pkg/store"
49+
"github.com/thanos-io/thanos/pkg/targets"
4950
"github.com/thanos-io/thanos/pkg/tls"
5051
"github.com/thanos-io/thanos/pkg/ui"
5152
)
@@ -103,6 +104,10 @@ func registerQuery(app *extkingpin.App) {
103104
metadataEndpoints := cmd.Flag("metadata", "Experimental: Addresses of statically configured metadata API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect metadata API servers through respective DNS lookups.").
104105
Hidden().PlaceHolder("<metadata>").Strings()
105106

107+
// TODO(atunik): Hidden because we plan to extract discovery to separate API: https://github.com/thanos-io/thanos/issues/2600.
108+
targetEndpoints := cmd.Flag("target", "Experimental: Addresses of statically configured target API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect target API servers through respective DNS lookups.").
109+
Hidden().PlaceHolder("<target>").Strings()
110+
106111
strictStores := cmd.Flag("store-strict", "Addresses of only statically configured store API servers that are always used, even if the health check fails. Useful if you have a caching layer on top.").
107112
PlaceHolder("<staticstore>").Strings()
108113

@@ -130,6 +135,9 @@ func registerQuery(app *extkingpin.App) {
130135
enableRulePartialResponse := cmd.Flag("rule.partial-response", "Enable partial response for rules endpoint. --no-rule.partial-response for disabling.").
131136
Hidden().Default("true").Bool()
132137

138+
enableTargetPartialResponse := cmd.Flag("target.partial-response", "Enable partial response for targets endpoint. --no-target.partial-response for disabling.").
139+
Hidden().Default("true").Bool()
140+
133141
enableMetricMetadataPartialResponse := cmd.Flag("metric-metadata.partial-response", "Enable partial response for metric metadata endpoint. --no-metric-metadata.partial-response for disabling.").
134142
Hidden().Default("true").Bool()
135143

@@ -169,6 +177,10 @@ func registerQuery(app *extkingpin.App) {
169177
return errors.Wrap(err, "error while parsing config for request logging")
170178
}
171179

180+
if dup := firstDuplicate(*targetEndpoints); dup != "" {
181+
return errors.Errorf("Address %s is duplicated for --target flag.", dup)
182+
}
183+
172184
var fileSD *file.Discovery
173185
if len(*fileSDFiles) > 0 {
174186
conf := &file.SDConfig{
@@ -222,10 +234,12 @@ func registerQuery(app *extkingpin.App) {
222234
getFlagsMap(cmd.Flags()),
223235
*stores,
224236
*ruleEndpoints,
237+
*targetEndpoints,
225238
*metadataEndpoints,
226239
*enableAutodownsampling,
227240
*enableQueryPartialResponse,
228241
*enableRulePartialResponse,
242+
*enableTargetPartialResponse,
229243
*enableMetricMetadataPartialResponse,
230244
fileSD,
231245
time.Duration(*dnsSDInterval),
@@ -278,10 +292,12 @@ func runQuery(
278292
flagsMap map[string]string,
279293
storeAddrs []string,
280294
ruleAddrs []string,
295+
targetAddrs []string,
281296
metadataAddrs []string,
282297
enableAutodownsampling bool,
283298
enableQueryPartialResponse bool,
284299
enableRulePartialResponse bool,
300+
enableTargetPartialResponse bool,
285301
enableMetricMetadataPartialResponse bool,
286302
fileSD *file.Discovery,
287303
dnsSDInterval time.Duration,
@@ -323,6 +339,12 @@ func runQuery(
323339
dns.ResolverType(dnsSDResolver),
324340
)
325341

342+
dnsTargetProvider := dns.NewProvider(
343+
logger,
344+
extprom.WrapRegistererWithPrefix("thanos_query_target_apis_", reg),
345+
dns.ResolverType(dnsSDResolver),
346+
)
347+
326348
dnsMetadataProvider := dns.NewProvider(
327349
logger,
328350
extprom.WrapRegistererWithPrefix("thanos_query_metadata_apis_", reg),
@@ -355,6 +377,13 @@ func runQuery(
355377

356378
return specs
357379
},
380+
func() (specs []query.TargetSpec) {
381+
for _, addr := range dnsTargetProvider.Addresses() {
382+
specs = append(specs, query.NewGRPCStoreSpec(addr, false))
383+
}
384+
385+
return specs
386+
},
358387
func() (specs []query.MetadataSpec) {
359388
for _, addr := range dnsMetadataProvider.Addresses() {
360389
specs = append(specs, query.NewGRPCStoreSpec(addr, false))
@@ -367,6 +396,7 @@ func runQuery(
367396
)
368397
proxy = store.NewProxyStore(logger, reg, stores.Get, component.Query, selectorLset, storeResponseTimeout)
369398
rulesProxy = rules.NewProxy(logger, stores.GetRulesClients)
399+
targetsProxy = targets.NewProxy(logger, stores.GetTargetsClients)
370400
metadataProxy = metadata.NewProxy(logger, stores.GetMetadataClients)
371401
queryableCreator = query.NewQueryableCreator(
372402
logger,
@@ -454,6 +484,9 @@ func runQuery(
454484
if err := dnsRuleProvider.Resolve(resolveCtx, ruleAddrs); err != nil {
455485
level.Error(logger).Log("msg", "failed to resolve addresses for rulesAPIs", "err", err)
456486
}
487+
if err := dnsTargetProvider.Resolve(ctx, targetAddrs); err != nil {
488+
level.Error(logger).Log("msg", "failed to resolve addresses for targetsAPIs", "err", err)
489+
}
457490
if err := dnsMetadataProvider.Resolve(resolveCtx, metadataAddrs); err != nil {
458491
level.Error(logger).Log("msg", "failed to resolve addresses for metadataAPIs", "err", err)
459492
}
@@ -504,10 +537,12 @@ func runQuery(
504537
queryableCreator,
505538
// NOTE: Will share the same replica label as the query for now.
506539
rules.NewGRPCClientWithDedup(rulesProxy, queryReplicaLabels),
540+
targets.NewGRPCClientWithDedup(targetsProxy, queryReplicaLabels),
507541
metadata.NewGRPCClient(metadataProxy),
508542
enableAutodownsampling,
509543
enableQueryPartialResponse,
510544
enableRulePartialResponse,
545+
enableTargetPartialResponse,
511546
enableMetricMetadataPartialResponse,
512547
queryReplicaLabels,
513548
flagsMap,
@@ -550,6 +585,7 @@ func runQuery(
550585
s := grpcserver.New(logger, reg, tracer, grpcLogOpts, tagOpts, comp, grpcProbe,
551586
grpcserver.WithServer(store.RegisterStoreServer(proxy)),
552587
grpcserver.WithServer(rules.RegisterRulesServer(rulesProxy)),
588+
grpcserver.WithServer(targets.RegisterTargetsServer(targetsProxy)),
553589
grpcserver.WithServer(metadata.RegisterMetadataServer(metadataProxy)),
554590
grpcserver.WithListen(grpcBindAddr),
555591
grpcserver.WithGracePeriod(grpcGracePeriod),

cmd/thanos/sidecar.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ import (
4242
httpserver "github.com/thanos-io/thanos/pkg/server/http"
4343
"github.com/thanos-io/thanos/pkg/shipper"
4444
"github.com/thanos-io/thanos/pkg/store"
45+
"github.com/thanos-io/thanos/pkg/targets"
4546
"github.com/thanos-io/thanos/pkg/tls"
4647
"github.com/thanos-io/thanos/pkg/tracing"
4748
)
@@ -229,6 +230,7 @@ func runSidecar(
229230
s := grpcserver.New(logger, reg, tracer, grpcLogOpts, tagOpts, comp, grpcProbe,
230231
grpcserver.WithServer(store.RegisterStoreServer(promStore)),
231232
grpcserver.WithServer(rules.RegisterRulesServer(rules.NewPrometheus(conf.prometheus.url, c, m.Labels))),
233+
grpcserver.WithServer(targets.RegisterTargetsServer(targets.NewPrometheus(conf.prometheus.url, c, m.Labels))),
232234
grpcserver.WithServer(meta.RegisterMetadataServer(meta.NewPrometheus(conf.prometheus.url, c))),
233235
grpcserver.WithListen(conf.grpc.bindAddress),
234236
grpcserver.WithGracePeriod(time.Duration(conf.grpc.gracePeriod)),

pkg/api/query/v1.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ import (
5050
"github.com/thanos-io/thanos/pkg/rules/rulespb"
5151
"github.com/thanos-io/thanos/pkg/runutil"
5252
"github.com/thanos-io/thanos/pkg/store/storepb"
53+
"github.com/thanos-io/thanos/pkg/targets"
54+
"github.com/thanos-io/thanos/pkg/targets/targetspb"
5355
"github.com/thanos-io/thanos/pkg/tracing"
5456
)
5557

@@ -72,11 +74,13 @@ type QueryAPI struct {
7274
// queryEngine returns appropriate promql.Engine for a query with a given step.
7375
queryEngine func(int64) *promql.Engine
7476
ruleGroups rules.UnaryClient
77+
targets targets.UnaryClient
7578
metadatas metadata.UnaryClient
7679

7780
enableAutodownsampling bool
7881
enableQueryPartialResponse bool
7982
enableRulePartialResponse bool
83+
enableTargetPartialResponse bool
8084
enableMetricMetadataPartialResponse bool
8185
disableCORS bool
8286

@@ -95,10 +99,12 @@ func NewQueryAPI(
9599
qe func(int64) *promql.Engine,
96100
c query.QueryableCreator,
97101
ruleGroups rules.UnaryClient,
102+
targets targets.UnaryClient,
98103
metadatas metadata.UnaryClient,
99104
enableAutodownsampling bool,
100105
enableQueryPartialResponse bool,
101106
enableRulePartialResponse bool,
107+
enableTargetPartialResponse bool,
102108
enableMetricMetadataPartialResponse bool,
103109
replicaLabels []string,
104110
flagsMap map[string]string,
@@ -115,11 +121,13 @@ func NewQueryAPI(
115121
queryableCreate: c,
116122
gate: gate,
117123
ruleGroups: ruleGroups,
124+
targets: targets,
118125
metadatas: metadatas,
119126

120127
enableAutodownsampling: enableAutodownsampling,
121128
enableQueryPartialResponse: enableQueryPartialResponse,
122129
enableRulePartialResponse: enableRulePartialResponse,
130+
enableTargetPartialResponse: enableTargetPartialResponse,
123131
enableMetricMetadataPartialResponse: enableMetricMetadataPartialResponse,
124132
replicaLabels: replicaLabels,
125133
storeSet: storeSet,
@@ -154,6 +162,8 @@ func (qapi *QueryAPI) Register(r *route.Router, tracer opentracing.Tracer, logge
154162

155163
r.Get("/rules", instr("rules", NewRulesHandler(qapi.ruleGroups, qapi.enableRulePartialResponse)))
156164

165+
r.Get("/targets", instr("targets", NewTargetsHandler(qapi.targets, qapi.enableTargetPartialResponse)))
166+
157167
r.Get("/metadata", instr("metadata", NewMetricMetadataHandler(qapi.metadatas, qapi.enableMetricMetadataPartialResponse)))
158168
}
159169

@@ -652,6 +662,36 @@ func (qapi *QueryAPI) stores(_ *http.Request) (interface{}, []error, *api.ApiErr
652662
return statuses, nil, nil
653663
}
654664

665+
func NewTargetsHandler(client targets.UnaryClient, enablePartialResponse bool) func(*http.Request) (interface{}, []error, *api.ApiError) {
666+
ps := storepb.PartialResponseStrategy_ABORT
667+
if enablePartialResponse {
668+
ps = storepb.PartialResponseStrategy_WARN
669+
}
670+
671+
return func(r *http.Request) (interface{}, []error, *api.ApiError) {
672+
stateParam := r.URL.Query().Get("state")
673+
state, ok := targetspb.TargetsRequest_State_value[strings.ToUpper(stateParam)]
674+
if !ok {
675+
if stateParam != "" {
676+
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: errors.Errorf("invalid targets parameter state='%v'", stateParam)}
677+
}
678+
state = int32(targetspb.TargetsRequest_ANY)
679+
}
680+
681+
req := &targetspb.TargetsRequest{
682+
State: targetspb.TargetsRequest_State(state),
683+
PartialResponseStrategy: ps,
684+
}
685+
686+
t, warnings, err := client.Targets(r.Context(), req)
687+
if err != nil {
688+
return nil, nil, &api.ApiError{Typ: api.ErrorInternal, Err: errors.Wrap(err, "retrieving targets")}
689+
}
690+
691+
return t, warnings, nil
692+
}
693+
}
694+
655695
// NewRulesHandler created handler compatible with HTTP /api/v1/rules https://prometheus.io/docs/prometheus/latest/querying/api/#rules
656696
// which uses gRPC Unary Rules API.
657697
func NewRulesHandler(client rules.UnaryClient, enablePartialResponse bool) func(*http.Request) (interface{}, []error, *api.ApiError) {

pkg/promclient/promclient.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import (
3535
"github.com/thanos-io/thanos/pkg/rules/rulespb"
3636
"github.com/thanos-io/thanos/pkg/runutil"
3737
"github.com/thanos-io/thanos/pkg/store/storepb"
38+
"github.com/thanos-io/thanos/pkg/targets/targetspb"
3839
"github.com/thanos-io/thanos/pkg/tracing"
3940
"google.golang.org/grpc/codes"
4041
yaml "gopkg.in/yaml.v2"
@@ -758,3 +759,19 @@ func (c *Client) MetadataInGRPC(ctx context.Context, base *url.URL, metric strin
758759
}
759760
return v.Data, c.get2xxResultWithGRPCErrors(ctx, "/metadata HTTP[client]", &u, &v)
760761
}
762+
763+
func (c *Client) TargetsInGRPC(ctx context.Context, base *url.URL, stateTargets string) (*targetspb.TargetDiscovery, error) {
764+
u := *base
765+
u.Path = path.Join(u.Path, "/api/v1/targets")
766+
767+
if stateTargets != "" {
768+
q := u.Query()
769+
q.Add("state", stateTargets)
770+
u.RawQuery = q.Encode()
771+
}
772+
773+
var v struct {
774+
Data *targetspb.TargetDiscovery `json:"data"`
775+
}
776+
return v.Data, c.get2xxResultWithGRPCErrors(ctx, "/targets HTTP[client]", &u, &v)
777+
}

0 commit comments

Comments
 (0)