Skip to content

Commit ab6f27c

Browse files
committed
federated targets functionality (#1375)
Signed-off-by: Alexander Tunik <2braven@gmail.com>
1 parent 5aa5e41 commit ab6f27c

27 files changed

+3036
-17
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ We use _breaking :warning:_ to mark changes that are not backward compatible (re
1212

1313
## Unreleased
1414

15+
- [#3350](https://github.com/thanos-io/thanos/pull/3350) Query/Sidecar: Added targets API support. You can now configure you Querier to fetch Prometheus targets from leaf Prometheus-es!
16+
1517
### Added
1618

1719
- [#3977](https://github.com/thanos-io/thanos/pull/3903) Expose exemplars for `http_request_duration_seconds` histogram if tracing is enabled.

cmd/thanos/query.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ import (
4747
grpcserver "github.com/thanos-io/thanos/pkg/server/grpc"
4848
httpserver "github.com/thanos-io/thanos/pkg/server/http"
4949
"github.com/thanos-io/thanos/pkg/store"
50+
"github.com/thanos-io/thanos/pkg/targets"
5051
"github.com/thanos-io/thanos/pkg/tls"
5152
"github.com/thanos-io/thanos/pkg/ui"
5253
)
@@ -108,6 +109,10 @@ func registerQuery(app *extkingpin.App) {
108109
exemplarEndpoints := cmd.Flag("exemplar", "Experimental: Addresses of statically configured exemplars API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect exemplars API servers through respective DNS lookups.").
109110
Hidden().PlaceHolder("<exemplar>").Strings()
110111

112+
// TODO(atunik): Hidden because we plan to extract discovery to separate API: https://github.com/thanos-io/thanos/issues/2600.
113+
targetEndpoints := cmd.Flag("target", "Experimental: Addresses of statically configured target API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect target API servers through respective DNS lookups.").
114+
Hidden().PlaceHolder("<target>").Strings()
115+
111116
strictStores := cmd.Flag("store-strict", "Addresses of only statically configured store API servers that are always used, even if the health check fails. Useful if you have a caching layer on top.").
112117
PlaceHolder("<staticstore>").Strings()
113118

@@ -135,6 +140,9 @@ func registerQuery(app *extkingpin.App) {
135140
enableRulePartialResponse := cmd.Flag("rule.partial-response", "Enable partial response for rules endpoint. --no-rule.partial-response for disabling.").
136141
Hidden().Default("true").Bool()
137142

143+
enableTargetPartialResponse := cmd.Flag("target.partial-response", "Enable partial response for targets endpoint. --no-target.partial-response for disabling.").
144+
Hidden().Default("true").Bool()
145+
138146
enableMetricMetadataPartialResponse := cmd.Flag("metric-metadata.partial-response", "Enable partial response for metric metadata endpoint. --no-metric-metadata.partial-response for disabling.").
139147
Hidden().Default("true").Bool()
140148

@@ -178,6 +186,10 @@ func registerQuery(app *extkingpin.App) {
178186
return errors.Wrap(err, "error while parsing config for request logging")
179187
}
180188

189+
if dup := firstDuplicate(*targetEndpoints); dup != "" {
190+
return errors.Errorf("Address %s is duplicated for --target flag.", dup)
191+
}
192+
181193
var fileSD *file.Discovery
182194
if len(*fileSDFiles) > 0 {
183195
conf := &file.SDConfig{
@@ -232,11 +244,13 @@ func registerQuery(app *extkingpin.App) {
232244
getFlagsMap(cmd.Flags()),
233245
*stores,
234246
*ruleEndpoints,
247+
*targetEndpoints,
235248
*metadataEndpoints,
236249
*exemplarEndpoints,
237250
*enableAutodownsampling,
238251
*enableQueryPartialResponse,
239252
*enableRulePartialResponse,
253+
*enableTargetPartialResponse,
240254
*enableMetricMetadataPartialResponse,
241255
fileSD,
242256
time.Duration(*dnsSDInterval),
@@ -290,11 +304,13 @@ func runQuery(
290304
flagsMap map[string]string,
291305
storeAddrs []string,
292306
ruleAddrs []string,
307+
targetAddrs []string,
293308
metadataAddrs []string,
294309
exemplarAddrs []string,
295310
enableAutodownsampling bool,
296311
enableQueryPartialResponse bool,
297312
enableRulePartialResponse bool,
313+
enableTargetPartialResponse bool,
298314
enableMetricMetadataPartialResponse bool,
299315
fileSD *file.Discovery,
300316
dnsSDInterval time.Duration,
@@ -336,6 +352,12 @@ func runQuery(
336352
dns.ResolverType(dnsSDResolver),
337353
)
338354

355+
dnsTargetProvider := dns.NewProvider(
356+
logger,
357+
extprom.WrapRegistererWithPrefix("thanos_query_target_apis_", reg),
358+
dns.ResolverType(dnsSDResolver),
359+
)
360+
339361
dnsMetadataProvider := dns.NewProvider(
340362
logger,
341363
extprom.WrapRegistererWithPrefix("thanos_query_metadata_apis_", reg),
@@ -374,6 +396,13 @@ func runQuery(
374396

375397
return specs
376398
},
399+
func() (specs []query.TargetSpec) {
400+
for _, addr := range dnsTargetProvider.Addresses() {
401+
specs = append(specs, query.NewGRPCStoreSpec(addr, false))
402+
}
403+
404+
return specs
405+
},
377406
func() (specs []query.MetadataSpec) {
378407
for _, addr := range dnsMetadataProvider.Addresses() {
379408
specs = append(specs, query.NewGRPCStoreSpec(addr, false))
@@ -393,6 +422,7 @@ func runQuery(
393422
)
394423
proxy = store.NewProxyStore(logger, reg, stores.Get, component.Query, selectorLset, storeResponseTimeout)
395424
rulesProxy = rules.NewProxy(logger, stores.GetRulesClients)
425+
targetsProxy = targets.NewProxy(logger, stores.GetTargetsClients)
396426
metadataProxy = metadata.NewProxy(logger, stores.GetMetadataClients)
397427
exemplarsProxy = exemplars.NewProxy(logger, stores.GetExemplarsClients)
398428
queryableCreator = query.NewQueryableCreator(
@@ -481,6 +511,9 @@ func runQuery(
481511
if err := dnsRuleProvider.Resolve(resolveCtx, ruleAddrs); err != nil {
482512
level.Error(logger).Log("msg", "failed to resolve addresses for rulesAPIs", "err", err)
483513
}
514+
if err := dnsTargetProvider.Resolve(ctx, targetAddrs); err != nil {
515+
level.Error(logger).Log("msg", "failed to resolve addresses for targetsAPIs", "err", err)
516+
}
484517
if err := dnsMetadataProvider.Resolve(resolveCtx, metadataAddrs); err != nil {
485518
level.Error(logger).Log("msg", "failed to resolve addresses for metadataAPIs", "err", err)
486519
}
@@ -534,11 +567,13 @@ func runQuery(
534567
queryableCreator,
535568
// NOTE: Will share the same replica label as the query for now.
536569
rules.NewGRPCClientWithDedup(rulesProxy, queryReplicaLabels),
570+
targets.NewGRPCClientWithDedup(targetsProxy, queryReplicaLabels),
537571
metadata.NewGRPCClient(metadataProxy),
538572
exemplars.NewGRPCClientWithDedup(exemplarsProxy, queryReplicaLabels),
539573
enableAutodownsampling,
540574
enableQueryPartialResponse,
541575
enableRulePartialResponse,
576+
enableTargetPartialResponse,
542577
enableMetricMetadataPartialResponse,
543578
queryReplicaLabels,
544579
flagsMap,
@@ -581,6 +616,7 @@ func runQuery(
581616
s := grpcserver.New(logger, reg, tracer, grpcLogOpts, tagOpts, comp, grpcProbe,
582617
grpcserver.WithServer(store.RegisterStoreServer(proxy)),
583618
grpcserver.WithServer(rules.RegisterRulesServer(rulesProxy)),
619+
grpcserver.WithServer(targets.RegisterTargetsServer(targetsProxy)),
584620
grpcserver.WithServer(metadata.RegisterMetadataServer(metadataProxy)),
585621
grpcserver.WithServer(exemplars.RegisterExemplarsServer(exemplarsProxy)),
586622
grpcserver.WithListen(grpcBindAddr),

cmd/thanos/sidecar.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ import (
4343
httpserver "github.com/thanos-io/thanos/pkg/server/http"
4444
"github.com/thanos-io/thanos/pkg/shipper"
4545
"github.com/thanos-io/thanos/pkg/store"
46+
"github.com/thanos-io/thanos/pkg/targets"
4647
"github.com/thanos-io/thanos/pkg/tls"
4748
"github.com/thanos-io/thanos/pkg/tracing"
4849
)
@@ -230,6 +231,7 @@ func runSidecar(
230231
s := grpcserver.New(logger, reg, tracer, grpcLogOpts, tagOpts, comp, grpcProbe,
231232
grpcserver.WithServer(store.RegisterStoreServer(promStore)),
232233
grpcserver.WithServer(rules.RegisterRulesServer(rules.NewPrometheus(conf.prometheus.url, c, m.Labels))),
234+
grpcserver.WithServer(targets.RegisterTargetsServer(targets.NewPrometheus(conf.prometheus.url, c, m.Labels))),
233235
grpcserver.WithServer(meta.RegisterMetadataServer(meta.NewPrometheus(conf.prometheus.url, c))),
234236
grpcserver.WithServer(exemplars.RegisterExemplarsServer(exemplars.NewPrometheus(conf.prometheus.url, c, m.Labels))),
235237
grpcserver.WithListen(conf.grpc.bindAddress),

pkg/api/query/v1.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ import (
5353
"github.com/thanos-io/thanos/pkg/rules/rulespb"
5454
"github.com/thanos-io/thanos/pkg/runutil"
5555
"github.com/thanos-io/thanos/pkg/store/storepb"
56+
"github.com/thanos-io/thanos/pkg/targets"
57+
"github.com/thanos-io/thanos/pkg/targets/targetspb"
5658
"github.com/thanos-io/thanos/pkg/tracing"
5759
)
5860

@@ -75,12 +77,14 @@ type QueryAPI struct {
7577
// queryEngine returns appropriate promql.Engine for a query with a given step.
7678
queryEngine func(int64) *promql.Engine
7779
ruleGroups rules.UnaryClient
80+
targets targets.UnaryClient
7881
metadatas metadata.UnaryClient
7982
exemplars exemplars.UnaryClient
8083

8184
enableAutodownsampling bool
8285
enableQueryPartialResponse bool
8386
enableRulePartialResponse bool
87+
enableTargetPartialResponse bool
8488
enableMetricMetadataPartialResponse bool
8589
enableExemplarPartialResponse bool
8690
disableCORS bool
@@ -100,11 +104,13 @@ func NewQueryAPI(
100104
qe func(int64) *promql.Engine,
101105
c query.QueryableCreator,
102106
ruleGroups rules.UnaryClient,
107+
targets targets.UnaryClient,
103108
metadatas metadata.UnaryClient,
104109
exemplars exemplars.UnaryClient,
105110
enableAutodownsampling bool,
106111
enableQueryPartialResponse bool,
107112
enableRulePartialResponse bool,
113+
enableTargetPartialResponse bool,
108114
enableMetricMetadataPartialResponse bool,
109115
replicaLabels []string,
110116
flagsMap map[string]string,
@@ -121,12 +127,14 @@ func NewQueryAPI(
121127
queryableCreate: c,
122128
gate: gate,
123129
ruleGroups: ruleGroups,
130+
targets: targets,
124131
metadatas: metadatas,
125132
exemplars: exemplars,
126133

127134
enableAutodownsampling: enableAutodownsampling,
128135
enableQueryPartialResponse: enableQueryPartialResponse,
129136
enableRulePartialResponse: enableRulePartialResponse,
137+
enableTargetPartialResponse: enableTargetPartialResponse,
130138
enableMetricMetadataPartialResponse: enableMetricMetadataPartialResponse,
131139
replicaLabels: replicaLabels,
132140
storeSet: storeSet,
@@ -161,6 +169,8 @@ func (qapi *QueryAPI) Register(r *route.Router, tracer opentracing.Tracer, logge
161169

162170
r.Get("/rules", instr("rules", NewRulesHandler(qapi.ruleGroups, qapi.enableRulePartialResponse)))
163171

172+
r.Get("/targets", instr("targets", NewTargetsHandler(qapi.targets, qapi.enableTargetPartialResponse)))
173+
164174
r.Get("/metadata", instr("metadata", NewMetricMetadataHandler(qapi.metadatas, qapi.enableMetricMetadataPartialResponse)))
165175

166176
r.Get("/query_exemplars", instr("exemplars", NewExemplarsHandler(qapi.exemplars, qapi.enableExemplarPartialResponse)))
@@ -662,6 +672,36 @@ func (qapi *QueryAPI) stores(_ *http.Request) (interface{}, []error, *api.ApiErr
662672
return statuses, nil, nil
663673
}
664674

675+
func NewTargetsHandler(client targets.UnaryClient, enablePartialResponse bool) func(*http.Request) (interface{}, []error, *api.ApiError) {
676+
ps := storepb.PartialResponseStrategy_ABORT
677+
if enablePartialResponse {
678+
ps = storepb.PartialResponseStrategy_WARN
679+
}
680+
681+
return func(r *http.Request) (interface{}, []error, *api.ApiError) {
682+
stateParam := r.URL.Query().Get("state")
683+
state, ok := targetspb.TargetsRequest_State_value[strings.ToUpper(stateParam)]
684+
if !ok {
685+
if stateParam != "" {
686+
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: errors.Errorf("invalid targets parameter state='%v'", stateParam)}
687+
}
688+
state = int32(targetspb.TargetsRequest_ANY)
689+
}
690+
691+
req := &targetspb.TargetsRequest{
692+
State: targetspb.TargetsRequest_State(state),
693+
PartialResponseStrategy: ps,
694+
}
695+
696+
t, warnings, err := client.Targets(r.Context(), req)
697+
if err != nil {
698+
return nil, nil, &api.ApiError{Typ: api.ErrorInternal, Err: errors.Wrap(err, "retrieving targets")}
699+
}
700+
701+
return t, warnings, nil
702+
}
703+
}
704+
665705
// NewRulesHandler created handler compatible with HTTP /api/v1/rules https://prometheus.io/docs/prometheus/latest/querying/api/#rules
666706
// which uses gRPC Unary Rules API.
667707
func NewRulesHandler(client rules.UnaryClient, enablePartialResponse bool) func(*http.Request) (interface{}, []error, *api.ApiError) {

pkg/promclient/promclient.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import (
3636
"github.com/thanos-io/thanos/pkg/rules/rulespb"
3737
"github.com/thanos-io/thanos/pkg/runutil"
3838
"github.com/thanos-io/thanos/pkg/store/storepb"
39+
"github.com/thanos-io/thanos/pkg/targets/targetspb"
3940
"github.com/thanos-io/thanos/pkg/tracing"
4041
"google.golang.org/grpc/codes"
4142
yaml "gopkg.in/yaml.v2"
@@ -782,3 +783,19 @@ func (c *Client) ExemplarsInGRPC(ctx context.Context, base *url.URL, query strin
782783

783784
return m.Data, nil
784785
}
786+
787+
func (c *Client) TargetsInGRPC(ctx context.Context, base *url.URL, stateTargets string) (*targetspb.TargetDiscovery, error) {
788+
u := *base
789+
u.Path = path.Join(u.Path, "/api/v1/targets")
790+
791+
if stateTargets != "" {
792+
q := u.Query()
793+
q.Add("state", stateTargets)
794+
u.RawQuery = q.Encode()
795+
}
796+
797+
var v struct {
798+
Data *targetspb.TargetDiscovery `json:"data"`
799+
}
800+
return v.Data, c.get2xxResultWithGRPCErrors(ctx, "/targets HTTP[client]", &u, &v)
801+
}

0 commit comments

Comments
 (0)