Skip to content

Commit f384750

Browse files
committed
federated targets functionality (#1375)
Signed-off-by: Alexander Tunik <2braven@gmail.com>
1 parent 7ec45ef commit f384750

27 files changed

+3065
-18
lines changed

.bingo/go.mod

100755100644
File mode changed.

cmd/thanos/query.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ import (
4242
grpcserver "github.com/thanos-io/thanos/pkg/server/grpc"
4343
httpserver "github.com/thanos-io/thanos/pkg/server/http"
4444
"github.com/thanos-io/thanos/pkg/store"
45+
"github.com/thanos-io/thanos/pkg/targets"
4546
"github.com/thanos-io/thanos/pkg/tls"
4647
"github.com/thanos-io/thanos/pkg/ui"
4748
)
@@ -94,6 +95,10 @@ func registerQuery(app *extkingpin.App) {
9495
ruleEndpoints := cmd.Flag("rule", "Experimental: Addresses of statically configured rules API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect rule API servers through respective DNS lookups.").
9596
Hidden().PlaceHolder("<rule>").Strings()
9697

98+
// TODO(atunik): Hidden because we plan to extract discovery to separate API: https://github.com/thanos-io/thanos/issues/2600.
99+
targetEndpoints := cmd.Flag("target", "Experimental: Addresses of statically configured targets API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect rule API servers through respective DNS lookups.").
100+
Hidden().PlaceHolder("<target>").Strings()
101+
97102
strictStores := cmd.Flag("store-strict", "Addresses of only statically configured store API servers that are always used, even if the health check fails. Useful if you have a caching layer on top.").
98103
PlaceHolder("<staticstore>").Strings()
99104

@@ -139,6 +144,10 @@ func registerQuery(app *extkingpin.App) {
139144
return errors.Errorf("Address %s is duplicated for --rule flag.", dup)
140145
}
141146

147+
if dup := firstDuplicate(*targetEndpoints); dup != "" {
148+
return errors.Errorf("Address %s is duplicated for --target flag.", dup)
149+
}
150+
142151
var fileSD *file.Discovery
143152
if len(*fileSDFiles) > 0 {
144153
conf := &file.SDConfig{
@@ -188,6 +197,7 @@ func registerQuery(app *extkingpin.App) {
188197
getFlagsMap(cmd.Flags()),
189198
*stores,
190199
*ruleEndpoints,
200+
*targetEndpoints,
191201
*enableAutodownsampling,
192202
*enableQueryPartialResponse,
193203
*enableRulePartialResponse,
@@ -237,6 +247,7 @@ func runQuery(
237247
flagsMap map[string]string,
238248
storeAddrs []string,
239249
ruleAddrs []string,
250+
targetAddrs []string,
240251
enableAutodownsampling bool,
241252
enableQueryPartialResponse bool,
242253
enableRulePartialResponse bool,
@@ -279,6 +290,12 @@ func runQuery(
279290
dns.ResolverType(dnsSDResolver),
280291
)
281292

293+
dnsTargetProvider := dns.NewProvider(
294+
logger,
295+
extprom.WrapRegistererWithPrefix("thanos_querier_target_apis_", reg),
296+
dns.ResolverType(dnsSDResolver),
297+
)
298+
282299
var (
283300
stores = query.NewStoreSet(
284301
logger,
@@ -305,11 +322,22 @@ func runQuery(
305322

306323
return specs
307324
},
325+
func() (specs []query.TargetSpec) {
326+
for _, addr := range dnsTargetProvider.Addresses() {
327+
specs = append(specs, query.NewGRPCStoreSpec(addr, false))
328+
}
329+
330+
// NOTE(s-urbaniak): No need to remove duplicates, as target apis are a subset of store apis.
331+
// hence, any duplicates will be tracked in the store api set.
332+
333+
return specs
334+
},
308335
dialOpts,
309336
unhealthyStoreTimeout,
310337
)
311338
proxy = store.NewProxyStore(logger, reg, stores.Get, component.Query, selectorLset, storeResponseTimeout)
312339
rulesProxy = rules.NewProxy(logger, stores.GetRulesClients)
340+
targetsProxy = targets.NewProxy(logger, stores.GetTargetsClients)
313341
queryableCreator = query.NewQueryableCreator(
314342
logger,
315343
extprom.WrapRegistererWithPrefix("thanos_query_", reg),
@@ -394,6 +422,9 @@ func runQuery(
394422
if err := dnsRuleProvider.Resolve(ctx, ruleAddrs); err != nil {
395423
level.Error(logger).Log("msg", "failed to resolve addresses for rulesAPIs", "err", err)
396424
}
425+
if err := dnsTargetProvider.Resolve(ctx, targetAddrs); err != nil {
426+
level.Error(logger).Log("msg", "failed to resolve addresses for targetsAPIs", "err", err)
427+
}
397428
return nil
398429
})
399430
}, func(error) {
@@ -444,6 +475,7 @@ func runQuery(
444475
queryableCreator,
445476
// NOTE: Will share the same replica label as the query for now.
446477
rules.NewGRPCClientWithDedup(rulesProxy, queryReplicaLabels),
478+
targets.NewGRPCClientWithDedup(targetsProxy, queryReplicaLabels),
447479
enableAutodownsampling,
448480
enableQueryPartialResponse,
449481
enableRulePartialResponse,
@@ -486,6 +518,7 @@ func runQuery(
486518
s := grpcserver.New(logger, reg, tracer, comp, grpcProbe,
487519
grpcserver.WithServer(store.RegisterStoreServer(proxy)),
488520
grpcserver.WithServer(rules.RegisterRulesServer(rulesProxy)),
521+
grpcserver.WithServer(targets.RegisterTargetsServer(targetsProxy)),
489522
grpcserver.WithListen(grpcBindAddr),
490523
grpcserver.WithGracePeriod(grpcGracePeriod),
491524
grpcserver.WithTLSConfig(tlsCfg),

cmd/thanos/sidecar.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import (
3838
httpserver "github.com/thanos-io/thanos/pkg/server/http"
3939
"github.com/thanos-io/thanos/pkg/shipper"
4040
"github.com/thanos-io/thanos/pkg/store"
41+
"github.com/thanos-io/thanos/pkg/targets"
4142
"github.com/thanos-io/thanos/pkg/tls"
4243
"github.com/thanos-io/thanos/pkg/tracing"
4344
)
@@ -218,6 +219,7 @@ func runSidecar(
218219
s := grpcserver.New(logger, reg, tracer, comp, grpcProbe,
219220
grpcserver.WithServer(store.RegisterStoreServer(promStore)),
220221
grpcserver.WithServer(rules.RegisterRulesServer(rules.NewPrometheus(conf.prometheus.url, c, m.Labels))),
222+
grpcserver.WithServer(targets.RegisterTargetsServer(targets.NewPrometheus(conf.prometheus.url, c, m.Labels))),
221223
grpcserver.WithListen(conf.grpc.bindAddress),
222224
grpcserver.WithGracePeriod(time.Duration(conf.grpc.gracePeriod)),
223225
grpcserver.WithTLSConfig(tlsCfg),

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ require (
1818
github.com/facette/natsort v0.0.0-20181210072756-2cd4dd1e2dcb
1919
github.com/fatih/structtag v1.1.0
2020
github.com/felixge/fgprof v0.9.1
21+
github.com/fortytw2/leaktest v1.3.0
2122
github.com/fsnotify/fsnotify v1.4.9
2223
github.com/go-kit/kit v0.10.0
2324
github.com/go-openapi/strfmt v0.19.5

go.sum

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,7 @@ github.com/felixge/fgprof v0.9.1/go.mod h1:7/HK6JFtFaARhIljgP2IV8rJLIoHDoOYoUphs
284284
github.com/felixge/httpsnoop v1.0.1 h1:lvB5Jl89CsZtGIWuTcDM1E/vkVs49/Ml7JJe07l8SPQ=
285285
github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
286286
github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k=
287+
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
287288
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
288289
github.com/franela/goblin v0.0.0-20200105215937-c9ffbefa60db/go.mod h1:7dvUGVsVBjqR7JHJk0brhHOZYGmfBYOrK0ZhYMEtBr4=
289290
github.com/franela/goreq v0.0.0-20171204163338-bcd34c9993f8/go.mod h1:ZhphrRTfi2rbfLwlschooIH4+wKKDR4Pdxhh+TRoA20=

pkg/api/query/v1.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ import (
4747
"github.com/thanos-io/thanos/pkg/rules/rulespb"
4848
"github.com/thanos-io/thanos/pkg/runutil"
4949
"github.com/thanos-io/thanos/pkg/store/storepb"
50+
"github.com/thanos-io/thanos/pkg/targets"
51+
"github.com/thanos-io/thanos/pkg/targets/targetspb"
5052
"github.com/thanos-io/thanos/pkg/tracing"
5153
)
5254

@@ -67,6 +69,7 @@ type QueryAPI struct {
6769
queryableCreate query.QueryableCreator
6870
queryEngine *promql.Engine
6971
ruleGroups rules.UnaryClient
72+
targets targets.UnaryClient
7073

7174
enableAutodownsampling bool
7275
enableQueryPartialResponse bool
@@ -86,6 +89,7 @@ func NewQueryAPI(
8689
qe *promql.Engine,
8790
c query.QueryableCreator,
8891
ruleGroups rules.UnaryClient,
92+
targets targets.UnaryClient,
8993
enableAutodownsampling bool,
9094
enableQueryPartialResponse bool,
9195
enableRulePartialResponse bool,
@@ -102,6 +106,7 @@ func NewQueryAPI(
102106
queryableCreate: c,
103107
gate: gate,
104108
ruleGroups: ruleGroups,
109+
targets: targets,
105110

106111
enableAutodownsampling: enableAutodownsampling,
107112
enableQueryPartialResponse: enableQueryPartialResponse,
@@ -136,6 +141,8 @@ func (qapi *QueryAPI) Register(r *route.Router, tracer opentracing.Tracer, logge
136141
r.Get("/stores", instr("stores", qapi.stores))
137142

138143
r.Get("/rules", instr("rules", NewRulesHandler(qapi.ruleGroups, qapi.enableRulePartialResponse)))
144+
145+
r.Get("/targets", instr("targets", NewTargetsHandler(qapi.targets, true)))
139146
}
140147

141148
type queryData struct {
@@ -562,6 +569,36 @@ func (qapi *QueryAPI) stores(r *http.Request) (interface{}, []error, *api.ApiErr
562569
return statuses, nil, nil
563570
}
564571

572+
func NewTargetsHandler(client targets.UnaryClient, enablePartialResponse bool) func(*http.Request) (interface{}, []error, *api.ApiError) {
573+
ps := storepb.PartialResponseStrategy_ABORT
574+
if enablePartialResponse {
575+
ps = storepb.PartialResponseStrategy_WARN
576+
}
577+
578+
return func(r *http.Request) (interface{}, []error, *api.ApiError) {
579+
stateParam := r.URL.Query().Get("state")
580+
state, ok := targetspb.TargetsRequest_State_value[strings.ToUpper(stateParam)]
581+
if !ok {
582+
if stateParam != "" {
583+
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: errors.Errorf("invalid targets parameter state='%v'", stateParam)}
584+
}
585+
state = int32(targetspb.TargetsRequest_ANY)
586+
}
587+
588+
req := &targetspb.TargetsRequest{
589+
State: targetspb.TargetsRequest_State(state),
590+
PartialResponseStrategy: ps,
591+
}
592+
593+
t, warnings, err := client.Targets(r.Context(), req)
594+
if err != nil {
595+
return nil, nil, &api.ApiError{Typ: api.ErrorInternal, Err: errors.Errorf("error retrieving targets: %v", err)}
596+
}
597+
598+
return t, warnings, nil
599+
}
600+
}
601+
565602
// NewRulesHandler created handler compatible with HTTP /api/v1/rules https://prometheus.io/docs/prometheus/latest/querying/api/#rules
566603
// which uses gRPC Unary Rules API.
567604
func NewRulesHandler(client rules.UnaryClient, enablePartialResponse bool) func(*http.Request) (interface{}, []error, *api.ApiError) {

pkg/promclient/promclient.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
"github.com/thanos-io/thanos/pkg/rules/rulespb"
3434
"github.com/thanos-io/thanos/pkg/runutil"
3535
"github.com/thanos-io/thanos/pkg/store/storepb"
36+
"github.com/thanos-io/thanos/pkg/targets/targetspb"
3637
"github.com/thanos-io/thanos/pkg/tracing"
3738
"google.golang.org/grpc/codes"
3839
yaml "gopkg.in/yaml.v2"
@@ -708,3 +709,35 @@ func (c *Client) RulesInGRPC(ctx context.Context, base *url.URL, typeRules strin
708709
}
709710
return m.Data.Groups, nil
710711
}
712+
713+
func (c *Client) TargetsInGRPC(ctx context.Context, base *url.URL, stateTargets string) (*targetspb.TargetDiscovery, error) {
714+
u := *base
715+
u.Path = path.Join(u.Path, "/api/v1/targets")
716+
717+
if stateTargets != "" {
718+
q := u.Query()
719+
q.Add("state", stateTargets)
720+
u.RawQuery = q.Encode()
721+
}
722+
723+
level.Debug(c.logger).Log("msg", "getting targets", "url", u.String())
724+
725+
span, ctx := tracing.StartSpan(ctx, "/targets HTTP[client]")
726+
defer span.Finish()
727+
728+
body, _, err := c.get2xx(ctx, &u)
729+
if err != nil {
730+
return nil, err
731+
}
732+
733+
// Decode only ResultType and load Result only as RawJson since we don't know
734+
// structure of the Result yet.
735+
var v struct {
736+
Data *targetspb.TargetDiscovery `json:"data"`
737+
}
738+
if err = json.Unmarshal(body, &v); err != nil {
739+
return nil, errors.Wrap(err, "unmarshal targets API response")
740+
}
741+
742+
return v.Data, nil
743+
}

0 commit comments

Comments
 (0)