Skip to content

Commit 0db9145

Browse files
committed
federated targets functionality (#1375)
Signed-off-by: Alexander Tunik <2braven@gmail.com>
1 parent 468d0d6 commit 0db9145

27 files changed

+3303
-17
lines changed

cmd/thanos/query.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ import (
4747
grpcserver "github.com/thanos-io/thanos/pkg/server/grpc"
4848
httpserver "github.com/thanos-io/thanos/pkg/server/http"
4949
"github.com/thanos-io/thanos/pkg/store"
50+
"github.com/thanos-io/thanos/pkg/targets"
5051
"github.com/thanos-io/thanos/pkg/tls"
5152
"github.com/thanos-io/thanos/pkg/ui"
5253
)
@@ -108,6 +109,10 @@ func registerQuery(app *extkingpin.App) {
108109
exemplarEndpoints := cmd.Flag("exemplar", "Experimental: Addresses of statically configured exemplars API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect exemplars API servers through respective DNS lookups.").
109110
Hidden().PlaceHolder("<exemplar>").Strings()
110111

112+
// TODO(atunik): Hidden because we plan to extract discovery to separate API: https://github.com/thanos-io/thanos/issues/2600.
113+
targetEndpoints := cmd.Flag("target", "Experimental: Addresses of statically configured target API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect target API servers through respective DNS lookups.").
114+
Hidden().PlaceHolder("<target>").Strings()
115+
111116
strictStores := cmd.Flag("store-strict", "Addresses of only statically configured store API servers that are always used, even if the health check fails. Useful if you have a caching layer on top.").
112117
PlaceHolder("<staticstore>").Strings()
113118

@@ -135,6 +140,9 @@ func registerQuery(app *extkingpin.App) {
135140
enableRulePartialResponse := cmd.Flag("rule.partial-response", "Enable partial response for rules endpoint. --no-rule.partial-response for disabling.").
136141
Hidden().Default("true").Bool()
137142

143+
enableTargetPartialResponse := cmd.Flag("target.partial-response", "Enable partial response for targets endpoint. --no-target.partial-response for disabling.").
144+
Hidden().Default("true").Bool()
145+
138146
enableMetricMetadataPartialResponse := cmd.Flag("metric-metadata.partial-response", "Enable partial response for metric metadata endpoint. --no-metric-metadata.partial-response for disabling.").
139147
Hidden().Default("true").Bool()
140148

@@ -178,6 +186,10 @@ func registerQuery(app *extkingpin.App) {
178186
return errors.Wrap(err, "error while parsing config for request logging")
179187
}
180188

189+
if dup := firstDuplicate(*targetEndpoints); dup != "" {
190+
return errors.Errorf("Address %s is duplicated for --target flag.", dup)
191+
}
192+
181193
var fileSD *file.Discovery
182194
if len(*fileSDFiles) > 0 {
183195
conf := &file.SDConfig{
@@ -232,11 +244,13 @@ func registerQuery(app *extkingpin.App) {
232244
getFlagsMap(cmd.Flags()),
233245
*stores,
234246
*ruleEndpoints,
247+
*targetEndpoints,
235248
*metadataEndpoints,
236249
*exemplarEndpoints,
237250
*enableAutodownsampling,
238251
*enableQueryPartialResponse,
239252
*enableRulePartialResponse,
253+
*enableTargetPartialResponse,
240254
*enableMetricMetadataPartialResponse,
241255
fileSD,
242256
time.Duration(*dnsSDInterval),
@@ -290,11 +304,13 @@ func runQuery(
290304
flagsMap map[string]string,
291305
storeAddrs []string,
292306
ruleAddrs []string,
307+
targetAddrs []string,
293308
metadataAddrs []string,
294309
exemplarAddrs []string,
295310
enableAutodownsampling bool,
296311
enableQueryPartialResponse bool,
297312
enableRulePartialResponse bool,
313+
enableTargetPartialResponse bool,
298314
enableMetricMetadataPartialResponse bool,
299315
fileSD *file.Discovery,
300316
dnsSDInterval time.Duration,
@@ -336,6 +352,12 @@ func runQuery(
336352
dns.ResolverType(dnsSDResolver),
337353
)
338354

355+
dnsTargetProvider := dns.NewProvider(
356+
logger,
357+
extprom.WrapRegistererWithPrefix("thanos_query_target_apis_", reg),
358+
dns.ResolverType(dnsSDResolver),
359+
)
360+
339361
dnsMetadataProvider := dns.NewProvider(
340362
logger,
341363
extprom.WrapRegistererWithPrefix("thanos_query_metadata_apis_", reg),
@@ -374,6 +396,13 @@ func runQuery(
374396

375397
return specs
376398
},
399+
func() (specs []query.TargetSpec) {
400+
for _, addr := range dnsTargetProvider.Addresses() {
401+
specs = append(specs, query.NewGRPCStoreSpec(addr, false))
402+
}
403+
404+
return specs
405+
},
377406
func() (specs []query.MetadataSpec) {
378407
for _, addr := range dnsMetadataProvider.Addresses() {
379408
specs = append(specs, query.NewGRPCStoreSpec(addr, false))
@@ -393,6 +422,7 @@ func runQuery(
393422
)
394423
proxy = store.NewProxyStore(logger, reg, stores.Get, component.Query, selectorLset, storeResponseTimeout)
395424
rulesProxy = rules.NewProxy(logger, stores.GetRulesClients)
425+
targetsProxy = targets.NewProxy(logger, stores.GetTargetsClients)
396426
metadataProxy = metadata.NewProxy(logger, stores.GetMetadataClients)
397427
exemplarsProxy = exemplars.NewProxy(logger, stores.GetExemplarsClients)
398428
queryableCreator = query.NewQueryableCreator(
@@ -481,6 +511,9 @@ func runQuery(
481511
if err := dnsRuleProvider.Resolve(resolveCtx, ruleAddrs); err != nil {
482512
level.Error(logger).Log("msg", "failed to resolve addresses for rulesAPIs", "err", err)
483513
}
514+
if err := dnsTargetProvider.Resolve(ctx, targetAddrs); err != nil {
515+
level.Error(logger).Log("msg", "failed to resolve addresses for targetsAPIs", "err", err)
516+
}
484517
if err := dnsMetadataProvider.Resolve(resolveCtx, metadataAddrs); err != nil {
485518
level.Error(logger).Log("msg", "failed to resolve addresses for metadataAPIs", "err", err)
486519
}
@@ -534,11 +567,13 @@ func runQuery(
534567
queryableCreator,
535568
// NOTE: Will share the same replica label as the query for now.
536569
rules.NewGRPCClientWithDedup(rulesProxy, queryReplicaLabels),
570+
targets.NewGRPCClientWithDedup(targetsProxy, queryReplicaLabels),
537571
metadata.NewGRPCClient(metadataProxy),
538572
exemplars.NewGRPCClientWithDedup(exemplarsProxy, queryReplicaLabels),
539573
enableAutodownsampling,
540574
enableQueryPartialResponse,
541575
enableRulePartialResponse,
576+
enableTargetPartialResponse,
542577
enableMetricMetadataPartialResponse,
543578
queryReplicaLabels,
544579
flagsMap,
@@ -581,6 +616,7 @@ func runQuery(
581616
s := grpcserver.New(logger, reg, tracer, grpcLogOpts, tagOpts, comp, grpcProbe,
582617
grpcserver.WithServer(store.RegisterStoreServer(proxy)),
583618
grpcserver.WithServer(rules.RegisterRulesServer(rulesProxy)),
619+
grpcserver.WithServer(targets.RegisterTargetsServer(targetsProxy)),
584620
grpcserver.WithServer(metadata.RegisterMetadataServer(metadataProxy)),
585621
grpcserver.WithServer(exemplars.RegisterExemplarsServer(exemplarsProxy)),
586622
grpcserver.WithListen(grpcBindAddr),

cmd/thanos/sidecar.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ import (
4343
httpserver "github.com/thanos-io/thanos/pkg/server/http"
4444
"github.com/thanos-io/thanos/pkg/shipper"
4545
"github.com/thanos-io/thanos/pkg/store"
46+
"github.com/thanos-io/thanos/pkg/targets"
4647
"github.com/thanos-io/thanos/pkg/tls"
4748
"github.com/thanos-io/thanos/pkg/tracing"
4849
)
@@ -230,6 +231,7 @@ func runSidecar(
230231
s := grpcserver.New(logger, reg, tracer, grpcLogOpts, tagOpts, comp, grpcProbe,
231232
grpcserver.WithServer(store.RegisterStoreServer(promStore)),
232233
grpcserver.WithServer(rules.RegisterRulesServer(rules.NewPrometheus(conf.prometheus.url, c, m.Labels))),
234+
grpcserver.WithServer(targets.RegisterTargetsServer(targets.NewPrometheus(conf.prometheus.url, c, m.Labels))),
233235
grpcserver.WithServer(meta.RegisterMetadataServer(meta.NewPrometheus(conf.prometheus.url, c))),
234236
grpcserver.WithServer(exemplars.RegisterExemplarsServer(exemplars.NewPrometheus(conf.prometheus.url, c, m.Labels))),
235237
grpcserver.WithListen(conf.grpc.bindAddress),

0 commit comments

Comments
 (0)