Skip to content

Commit e5d96c8

Browse files
simonpasquierbwplotka
authored andcommitted
pkg/rule: retain original path for rule files (#1785)
This change ensures that the /api/v1/rules endpoint returns the original path of the rule file instead of the path of the temporary file generated by Thanos ruler. Signed-off-by: Simon Pasquier <spasquie@redhat.com>
1 parent a4241a9 commit e5d96c8

File tree

9 files changed

+218
-137
lines changed

9 files changed

+218
-137
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel
3131
- [#1568](https://github.com/thanos-io/thanos/pull/1709) Thanos Store now retains the first raw value of a chunk during downsampling to avoid losing some counter resets that occur on an aggregation boundary.
3232
- [#1773](https://github.com/thanos-io/thanos/pull/1773) Thanos Ruler: fixed the /api/v1/rules endpoint that returned 500 status code with `failed to assert type of rule ...` message.
3333
- [#1770](https://github.com/thanos-io/thanos/pull/1770) Fix `--web.external-prefix` 404s for static resources.
34+
- [#1785](https://github.com/thanos-io/thanos/pull/1785) Thanos Ruler: the /api/v1/rules endpoints now returns the original rule filenames.
3435

3536
### Changed
3637

cmd/thanos/rule.go

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -291,7 +291,7 @@ func runRule(
291291
var (
292292
alertmgrs = newAlertmanagerSet(logger, alertmgrURLs, dns.ResolverType(dnsSDResolver))
293293
alertQ = alert.NewQueue(logger, reg, 10000, 100, labelsTSDBToProm(lset), alertExcludeLabels)
294-
ruleMgrs = thanosrule.Managers{}
294+
ruleMgr = thanosrule.NewManager(dataDir)
295295
)
296296
{
297297
notify := func(ctx context.Context, expr string, alerts ...*rules.Alert) {
@@ -338,15 +338,16 @@ func runRule(
338338
opts.Context = ctx
339339
opts.QueryFunc = queryFunc(logger, dnsProvider, duplicatedQuery, ruleEvalWarnings, s)
340340

341-
ruleMgrs[s] = rules.NewManager(&opts)
341+
mgr := rules.NewManager(&opts)
342+
ruleMgr.SetRuleManager(s, mgr)
342343
g.Add(func() error {
343-
ruleMgrs[s].Run()
344+
mgr.Run()
344345
<-ctx.Done()
345346

346347
return nil
347348
}, func(error) {
348349
cancel()
349-
ruleMgrs[s].Stop()
350+
mgr.Stop()
350351
})
351352
}
352353
}
@@ -447,7 +448,7 @@ func runRule(
447448

448449
level.Info(logger).Log("msg", "reload rule files", "numFiles", len(files))
449450

450-
if err := ruleMgrs.Update(dataDir, evalInterval, files); err != nil {
451+
if err := ruleMgr.Update(evalInterval, files); err != nil {
451452
configSuccess.Set(0)
452453
level.Error(logger).Log("msg", "reloading rules failed", "err", err)
453454
continue
@@ -457,10 +458,8 @@ func runRule(
457458
configSuccessTime.Set(float64(time.Now().UnixNano()) / 1e9)
458459

459460
rulesLoaded.Reset()
460-
for s, mgr := range ruleMgrs {
461-
for _, group := range mgr.RuleGroups() {
462-
rulesLoaded.WithLabelValues(s.String(), group.File(), group.Name()).Set(float64(len(group.Rules())))
463-
}
461+
for _, group := range ruleMgr.RuleGroups() {
462+
rulesLoaded.WithLabelValues(group.PartialResponseStrategy.String(), group.File(), group.Name()).Set(float64(len(group.Rules())))
464463
}
465464

466465
}
@@ -547,9 +546,9 @@ func runRule(
547546

548547
ins := extpromhttp.NewInstrumentationMiddleware(reg)
549548

550-
ui.NewRuleUI(logger, reg, ruleMgrs, alertQueryURL.String(), flagsMap).Register(router.WithPrefix(webRoutePrefix), ins)
549+
ui.NewRuleUI(logger, reg, ruleMgr, alertQueryURL.String(), flagsMap).Register(router.WithPrefix(webRoutePrefix), ins)
551550

552-
api := v1.NewAPI(logger, reg, ruleMgrs)
551+
api := v1.NewAPI(logger, reg, ruleMgr)
553552
api.Register(router.WithPrefix(path.Join(webRoutePrefix, "/api/v1")), tracer, logger, ins)
554553

555554
// Initiate HTTP listener providing metrics endpoint and readiness/liveness probes.

pkg/rule/api/v1.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ func (api *API) rules(r *http.Request) (interface{}, []error, *qapi.ApiError) {
6969
for _, grp := range api.ruleRetriever.RuleGroups() {
7070
apiRuleGroup := &RuleGroup{
7171
Name: grp.Name(),
72-
File: grp.File(),
72+
File: grp.OriginalFile(),
7373
Interval: grp.Interval().Seconds(),
7474
Rules: []rule{},
7575
PartialResponseStrategy: grp.PartialResponseStrategy.String(),

pkg/rule/api/v1_test.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"github.com/prometheus/prometheus/storage/tsdb"
2424
qapi "github.com/thanos-io/thanos/pkg/query/api"
2525
thanosrule "github.com/thanos-io/thanos/pkg/rule"
26+
"github.com/thanos-io/thanos/pkg/store/storepb"
2627
)
2728

2829
// NewStorage returns a new storage for testing purposes
@@ -92,8 +93,12 @@ func (m rulesRetrieverMock) RuleGroups() []thanosrule.Group {
9293
recordingRule := rules.NewRecordingRule("recording-rule-1", recordingExpr, labels.Labels{})
9394
r = append(r, recordingRule)
9495

95-
group := rules.NewGroup("grp", "/path/to/file", time.Second, r, false, opts)
96-
return []thanosrule.Group{thanosrule.Group{Group: group}}
96+
return []thanosrule.Group{
97+
thanosrule.Group{
98+
Group: rules.NewGroup("grp", "/path/to/file", time.Second, r, false, opts),
99+
PartialResponseStrategy: storepb.PartialResponseStrategy_WARN,
100+
},
101+
}
97102
}
98103

99104
func (m rulesRetrieverMock) AlertingRules() []thanosrule.AlertingRule {
@@ -189,7 +194,7 @@ func testEndpoints(t *testing.T, api *API) {
189194
RuleGroups: []*RuleGroup{
190195
{
191196
Name: "grp",
192-
File: "/path/to/file",
197+
File: "",
193198
Interval: 1,
194199
PartialResponseStrategy: "WARN",
195200
Rules: []rule{
@@ -263,13 +268,14 @@ func testEndpoints(t *testing.T, api *API) {
263268
}
264269

265270
func assertAPIError(t *testing.T, got *qapi.ApiError) {
271+
t.Helper()
266272
if got != nil {
267273
t.Fatalf("Unexpected error: %s", got)
268-
return
269274
}
270275
}
271276

272277
func assertAPIResponse(t *testing.T, got interface{}, exp interface{}) {
278+
t.Helper()
273279
if !reflect.DeepEqual(exp, got) {
274280
respJSON, err := json.Marshal(got)
275281
if err != nil {

pkg/rule/rule.go

Lines changed: 56 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@ import (
44
"fmt"
55
"io/ioutil"
66
"os"
7-
"path"
87
"path/filepath"
98
"strings"
9+
"sync"
1010
"time"
1111

1212
"github.com/pkg/errors"
@@ -21,9 +21,14 @@ const tmpRuleDir = ".tmp-rules"
2121

2222
type Group struct {
2323
*rules.Group
24+
originalFile string
2425
PartialResponseStrategy storepb.PartialResponseStrategy
2526
}
2627

28+
func (g Group) OriginalFile() string {
29+
return g.originalFile
30+
}
31+
2732
type AlertingRule struct {
2833
*rules.AlertingRule
2934
PartialResponseStrategy storepb.PartialResponseStrategy
@@ -38,21 +43,45 @@ type RuleGroup struct {
3843
PartialResponseStrategy *storepb.PartialResponseStrategy
3944
}
4045

41-
type Managers map[storepb.PartialResponseStrategy]*rules.Manager
46+
type Manager struct {
47+
workDir string
48+
mgrs map[storepb.PartialResponseStrategy]*rules.Manager
49+
50+
mtx sync.RWMutex
51+
ruleFiles map[string]string
52+
}
4253

43-
func (m Managers) RuleGroups() []Group {
54+
func NewManager(dataDir string) *Manager {
55+
return &Manager{
56+
workDir: filepath.Join(dataDir, tmpRuleDir),
57+
mgrs: make(map[storepb.PartialResponseStrategy]*rules.Manager),
58+
ruleFiles: make(map[string]string),
59+
}
60+
}
61+
62+
func (m *Manager) SetRuleManager(s storepb.PartialResponseStrategy, mgr *rules.Manager) {
63+
m.mgrs[s] = mgr
64+
}
65+
66+
func (m *Manager) RuleGroups() []Group {
67+
m.mtx.RLock()
68+
defer m.mtx.RUnlock()
4469
var res []Group
45-
for s, r := range m {
70+
for s, r := range m.mgrs {
4671
for _, group := range r.RuleGroups() {
47-
res = append(res, Group{Group: group, PartialResponseStrategy: s})
72+
res = append(res, Group{
73+
Group: group,
74+
PartialResponseStrategy: s,
75+
originalFile: m.ruleFiles[group.File()],
76+
})
4877
}
4978
}
5079
return res
5180
}
5281

53-
func (m Managers) AlertingRules() []AlertingRule {
82+
func (m *Manager) AlertingRules() []AlertingRule {
5483
var res []AlertingRule
55-
for s, r := range m {
84+
for s, r := range m.mgrs {
5685
for _, r := range r.AlertingRules() {
5786
res = append(res, AlertingRule{AlertingRule: r, PartialResponseStrategy: s})
5887
}
@@ -67,12 +96,12 @@ func (r *RuleGroup) UnmarshalYAML(unmarshal func(interface{}) error) error {
6796

6897
errMsg := fmt.Sprintf("failed to unmarshal 'partial_response_strategy'. Possible values are %s", strings.Join(storepb.PartialResponseStrategyValues, ","))
6998
if err := unmarshal(&rs); err != nil {
70-
return errors.Wrapf(err, errMsg)
99+
return errors.Wrap(err, errMsg)
71100
}
72101

73102
rg := rulefmt.RuleGroup{}
74103
if err := unmarshal(&rg); err != nil {
75-
return errors.Wrapf(err, errMsg)
104+
return errors.Wrap(err, "failed to unmarshal rulefmt.RuleGroup")
76105
}
77106

78107
p, ok := storepb.PartialResponseStrategy_value[strings.ToUpper(rs.String)]
@@ -110,17 +139,18 @@ func (r RuleGroup) MarshalYAML() (interface{}, error) {
110139

111140
// Update updates rules from given files to all managers we hold. We decide which groups should go where, based on
112141
// special field in RuleGroup file.
113-
func (m *Managers) Update(dataDir string, evalInterval time.Duration, files []string) error {
142+
func (m *Manager) Update(evalInterval time.Duration, files []string) error {
114143
var (
115-
errs = tsdberrors.MultiError{}
144+
errs tsdberrors.MultiError
116145
filesByStrategy = map[storepb.PartialResponseStrategy][]string{}
146+
ruleFiles = map[string]string{}
117147
)
118148

119-
if err := os.RemoveAll(path.Join(dataDir, tmpRuleDir)); err != nil {
120-
return errors.Wrapf(err, "rm %s", path.Join(dataDir, tmpRuleDir))
149+
if err := os.RemoveAll(m.workDir); err != nil {
150+
return errors.Wrapf(err, "failed to remove %s", m.workDir)
121151
}
122-
if err := os.MkdirAll(path.Join(dataDir, tmpRuleDir), os.ModePerm); err != nil {
123-
return errors.Wrapf(err, "mkdir %s", path.Join(dataDir, tmpRuleDir))
152+
if err := os.MkdirAll(m.workDir, os.ModePerm); err != nil {
153+
return errors.Wrapf(err, "failed to create %s", m.workDir)
124154
}
125155

126156
for _, fn := range files {
@@ -132,7 +162,7 @@ func (m *Managers) Update(dataDir string, evalInterval time.Duration, files []st
132162

133163
var rg RuleGroups
134164
if err := yaml.Unmarshal(b, &rg); err != nil {
135-
errs = append(errs, err)
165+
errs = append(errs, errors.Wrap(err, fn))
136166
continue
137167
}
138168

@@ -153,33 +183,37 @@ func (m *Managers) Update(dataDir string, evalInterval time.Duration, files []st
153183
for s, rg := range groupsByStrategy {
154184
b, err := yaml.Marshal(rg)
155185
if err != nil {
156-
errs = append(errs, err)
186+
errs = append(errs, errors.Wrapf(err, "%s: failed to marshal rule groups", fn))
157187
continue
158188
}
159189

160-
newFn := path.Join(dataDir, tmpRuleDir, filepath.Base(fn)+"."+s.String())
190+
newFn := filepath.Join(m.workDir, filepath.Base(fn)+"."+s.String())
161191
if err := ioutil.WriteFile(newFn, b, os.ModePerm); err != nil {
162-
errs = append(errs, err)
192+
errs = append(errs, errors.Wrap(err, newFn))
163193
continue
164194
}
165195

166196
filesByStrategy[s] = append(filesByStrategy[s], newFn)
197+
ruleFiles[newFn] = fn
167198
}
168199
}
169200

201+
m.mtx.Lock()
170202
for s, fs := range filesByStrategy {
171-
mgr, ok := (*m)[s]
203+
mgr, ok := m.mgrs[s]
172204
if !ok {
173-
errs = append(errs, errors.Errorf("no updater found for %v", s))
205+
errs = append(errs, errors.Errorf("no manager found for %v", s))
174206
continue
175207
}
176208
// We add external labels in `pkg/alert.Queue`.
177209
// TODO(bwplotka): Investigate if we should put ext labels here or not.
178210
if err := mgr.Update(evalInterval, fs, nil); err != nil {
179-
errs = append(errs, err)
211+
errs = append(errs, errors.Wrapf(err, "strategy %s", s))
180212
continue
181213
}
182214
}
215+
m.ruleFiles = ruleFiles
216+
m.mtx.Unlock()
183217

184218
return errs.Err()
185219
}

0 commit comments

Comments
 (0)