Skip to content

Commit 00dd8d8

Browse files
lilicjohncming
andcommitted
rule: Fix bug when rules were out of sync
Co-authored-by: johncming <johncming@yahoo.com> Signed-off-by: Lili Cosic <cosiclili@gmail.com>
1 parent e1189f5 commit 00dd8d8

File tree

4 files changed

+76
-11
lines changed

4 files changed

+76
-11
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel
1616
- [#2536](https://github.com/thanos-io/thanos/pull/2536) minio-go: Fixed AWS STS endpoint url to https for Web Identity providers on AWS EKS
1717
- [#2501](https://github.com/thanos-io/thanos/pull/2501) Query: gracefully handle additional fields in `SeriesResponse` protobuf message that may be added in the future.
1818
- [#2568](https://github.com/thanos-io/thanos/pull/2568) Query: does not close the connection of strict, static nodes if establishing a connection had succeeded but Info() call failed
19+
- [#2615](https://github.com/thanos-io/thanos/pull/2615) Rule: Fix bugs where rules were out of sync.
1920

2021
### Added
2122

cmd/thanos/rule.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -763,8 +763,9 @@ func reloadRules(logger log.Logger,
763763
metrics *RuleMetrics) error {
764764
level.Debug(logger).Log("msg", "configured rule files", "files", strings.Join(ruleFiles, ","))
765765
var (
766-
errs tsdberrors.MultiError
767-
files []string
766+
errs tsdberrors.MultiError
767+
files []string
768+
seenFiles = make(map[string]struct{})
768769
)
769770
for _, pat := range ruleFiles {
770771
fs, err := filepath.Glob(pat)
@@ -775,6 +776,13 @@ func reloadRules(logger log.Logger,
775776
}
776777

777778
files = append(files, fs...)
779+
for _, fp := range fs {
780+
if _, ok := seenFiles[fp]; ok {
781+
continue
782+
}
783+
files = append(files, fp)
784+
seenFiles[fp] = struct{}{}
785+
}
778786
}
779787

780788
level.Info(logger).Log("msg", "reload rule files", "numFiles", len(files))

pkg/rule/rule.go

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,17 +70,17 @@ func (m *Manager) SetRuleManager(s storepb.PartialResponseStrategy, mgr *rules.M
7070
func (m *Manager) RuleGroups() []Group {
7171
m.mtx.RLock()
7272
defer m.mtx.RUnlock()
73-
var res []Group
73+
var groups []Group
7474
for s, r := range m.mgrs {
7575
for _, group := range r.RuleGroups() {
76-
res = append(res, Group{
76+
groups = append(groups, Group{
7777
Group: group,
7878
PartialResponseStrategy: s,
7979
originalFile: m.ruleFiles[group.File()],
8080
})
8181
}
8282
}
83-
return res
83+
return groups
8484
}
8585

8686
func (m *Manager) AlertingRules() []AlertingRule {
@@ -216,6 +216,21 @@ func (m *Manager) Update(evalInterval time.Duration, files []string) error {
216216
continue
217217
}
218218
}
219+
220+
// Removes the rules from a manager when a strategy has no more rule.
221+
for s, mgr := range m.mgrs {
222+
if _, ok := filesByStrategy[s]; ok {
223+
continue
224+
}
225+
226+
if len(mgr.RuleGroups()) == 0 {
227+
continue
228+
}
229+
230+
if err := mgr.Update(evalInterval, []string{}, nil); err != nil {
231+
errs = append(errs, err)
232+
}
233+
}
219234
m.ruleFiles = ruleFiles
220235
m.mtx.Unlock()
221236

pkg/rule/rule_test.go

Lines changed: 47 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -66,14 +66,17 @@ groups:
6666
Appendable: nopAppendable{},
6767
}
6868
thanosRuleMgr := NewManager(dir)
69-
ruleMgr := rules.NewManager(&opts)
70-
thanosRuleMgr.SetRuleManager(storepb.PartialResponseStrategy_ABORT, ruleMgr)
71-
thanosRuleMgr.SetRuleManager(storepb.PartialResponseStrategy_WARN, ruleMgr)
69+
ruleMgrAbort := rules.NewManager(&opts)
70+
ruleMgrWarn := rules.NewManager(&opts)
71+
thanosRuleMgr.SetRuleManager(storepb.PartialResponseStrategy_ABORT, ruleMgrAbort)
72+
thanosRuleMgr.SetRuleManager(storepb.PartialResponseStrategy_WARN, ruleMgrWarn)
7273

73-
testutil.Ok(t, thanosRuleMgr.Update(10*time.Second, []string{filepath.Join(dir, "rule.yaml")}))
74+
ruleMgrAbort.Run()
75+
ruleMgrWarn.Run()
76+
defer ruleMgrAbort.Stop()
77+
defer ruleMgrWarn.Stop()
7478

75-
ruleMgr.Run()
76-
defer ruleMgr.Stop()
79+
testutil.Ok(t, thanosRuleMgr.Update(10*time.Second, []string{filepath.Join(dir, "rule.yaml")}))
7780

7881
select {
7982
case <-time.After(2 * time.Minute):
@@ -225,6 +228,44 @@ groups:
225228
}
226229
}
227230

231+
func TestUpdateAfterClear(t *testing.T) {
232+
dir, err := ioutil.TempDir("", "test_rule_rule_groups")
233+
testutil.Ok(t, err)
234+
defer func() { testutil.Ok(t, os.RemoveAll(dir)) }()
235+
236+
testutil.Ok(t, ioutil.WriteFile(filepath.Join(dir, "no_strategy.yaml"), []byte(`
237+
groups:
238+
- name: "something1"
239+
rules:
240+
- alert: "some"
241+
expr: "up"
242+
`), os.ModePerm))
243+
244+
opts := rules.ManagerOptions{
245+
Logger: log.NewLogfmtLogger(os.Stderr),
246+
}
247+
m := NewManager(dir)
248+
ruleMgrAbort := rules.NewManager(&opts)
249+
ruleMgrWarn := rules.NewManager(&opts)
250+
m.SetRuleManager(storepb.PartialResponseStrategy_ABORT, ruleMgrAbort)
251+
m.SetRuleManager(storepb.PartialResponseStrategy_WARN, ruleMgrWarn)
252+
253+
ruleMgrAbort.Run()
254+
ruleMgrWarn.Run()
255+
defer ruleMgrAbort.Stop()
256+
defer ruleMgrWarn.Stop()
257+
258+
err = m.Update(1*time.Second, []string{
259+
filepath.Join(dir, "no_strategy.yaml"),
260+
})
261+
testutil.Ok(t, err)
262+
testutil.Equals(t, 1, len(m.RuleGroups()))
263+
264+
err = m.Update(1*time.Second, []string{})
265+
testutil.Ok(t, err)
266+
testutil.Equals(t, 0, len(m.RuleGroups()))
267+
}
268+
228269
func TestRuleGroupMarshalYAML(t *testing.T) {
229270
const expected = `groups:
230271
- name: something1

0 commit comments

Comments
 (0)