Skip to content

Commit 65b3117

Browse files
author
Alexey Lebedeff
committed
Add support for BERT-encoded RabbitMQ replies
Since 3.6.9 (see rabbitmq/rabbitmq-management#367) RabbitMQ supports BERT encoding as a JSON alternative. Given that BERT encoding is implemented in C inside the Erlang VM, it's way more effective than pure-Erlang JSON encoding. So this greatly reduces monitoring overhead when we have a lot of objects in RabbitMQ.
1 parent 95cb2d5 commit 65b3117

33 files changed

+2215
-51
lines changed

README.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,13 @@ following capabilities are currently supported in
5555
starting from version 3.6.8. This option can be safely enabled on
5656
earlier 3.6.X versions, but it'll not give any performance
5757
improvements. And it's incompatible with 3.4.X and 3.5.X.
58+
* `bert`: Since 3.6.9 (see
59+
https://github.com/rabbitmq/rabbitmq-management/pull/367) RabbitMQ
60+
supports BERT encoding as a JSON alternative. Given that BERT
61+
encoding is implemented in C inside the Erlang VM, it's way more
62+
effective than pure-Erlang JSON encoding. So this greatly reduces
63+
monitoring overhead when we have a lot of objects in RabbitMQ.
64+
5865

5966
### Metrics
6067

bertmap.go

Lines changed: 311 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,311 @@
1+
package main
2+
3+
import (
4+
"fmt"
5+
log "github.com/Sirupsen/logrus"
6+
bert "github.com/landonia/gobert"
7+
"math/big"
8+
)
9+
10+
// rabbitBERTReply (along with its RabbitReply interface
11+
// implementation) allow parsing of BERT-encoded RabbitMQ replies in a
12+
// way that fully compatible with JSON parser from jsonmap.go
13+
type rabbitBERTReply struct {
14+
body []byte
15+
}
16+
17+
func MakeBERTReply(body []byte) RabbitReply {
18+
return &rabbitBERTReply{body}
19+
}
20+
21+
func (rep *rabbitBERTReply) MakeStatsInfo() []StatsInfo {
22+
rawObjects, err := bert.Decode(rep.body)
23+
if err != nil {
24+
log.WithField("error", err).Error("Error while decoding bert")
25+
return make([]StatsInfo, 0)
26+
}
27+
28+
objects, ok := rawObjects.([]bert.Term)
29+
if !ok {
30+
log.WithField("got", rawObjects).Error("Statistics reply should contain a slice of objects")
31+
return make([]StatsInfo, 0)
32+
}
33+
34+
statistics := make([]StatsInfo, 0, len(objects))
35+
36+
for _, v := range objects {
37+
obj, ok := parseSingleStatsObject(v)
38+
if !ok {
39+
log.WithField("got", v).Error("Ignoring unparseable stats object")
40+
continue
41+
}
42+
statistics = append(statistics, *obj)
43+
}
44+
45+
return statistics
46+
}
47+
48+
func (rep *rabbitBERTReply) MakeMap() MetricMap {
49+
flMap := make(MetricMap)
50+
term, err := bert.Decode(rep.body)
51+
52+
if err != nil {
53+
log.WithField("error", err).Error("Error while decoding bert")
54+
return flMap
55+
}
56+
57+
parseProplist(&flMap, "", term)
58+
return flMap
59+
}
60+
61+
// iterateBertKV helps to traverse any map-like structures returned by
62+
// RabbitMQ with a user-provided function. We need it because
63+
// different versions of RabbitMQ can encode a map in a bunch of
64+
// different ways:
65+
// - proplist
66+
// - proplist additionally wrapped in a {struct, ...} tuple
67+
// - map type available in modern erlang versions
68+
//
69+
// Non-nil error return means that an object can't be interpreted as a map in any way
70+
//
71+
// Provided function can return 'false' value to stop traversal earlier
72+
func iterateBertKV(obj interface{}, elemFunc func(string, interface{}) bool) error {
73+
switch obj := obj.(type) {
74+
case []bert.Term:
75+
pairs, ok := assertBertProplistPairs(obj)
76+
if !ok {
77+
return BertError("Doesn't look like a proplist", obj)
78+
}
79+
for _, v := range pairs {
80+
key, value, ok := assertBertKeyedTuple(v)
81+
if ok {
82+
needToContinue := elemFunc(key, value)
83+
if !needToContinue {
84+
return nil
85+
}
86+
}
87+
}
88+
return nil
89+
case bert.Map:
90+
for keyRaw, value := range obj {
91+
key, ok := parseBertStringy(keyRaw)
92+
if ok {
93+
needToContinue := elemFunc(key, value)
94+
if !needToContinue {
95+
return nil
96+
}
97+
}
98+
}
99+
return nil
100+
default:
101+
return BertError("Can't iterate over non-KV object", obj)
102+
}
103+
}
104+
105+
// parseSingleStatsObject extracts information about a named RabbitMQ
106+
// object: both its vhost/name information and then the usual
107+
// MetricMap.
108+
func parseSingleStatsObject(obj interface{}) (*StatsInfo, bool) {
109+
var ok bool
110+
var result StatsInfo
111+
var objectOk = true
112+
result.metrics = make(MetricMap)
113+
114+
err := iterateBertKV(obj, func(key string, value interface{}) bool {
115+
switch {
116+
case key == "name":
117+
result.name, ok = parseBertStringy(value)
118+
if !ok {
119+
log.WithField("got", value).Error("Non-string 'name' field")
120+
objectOk = false
121+
return false
122+
}
123+
case key == "vhost":
124+
result.vhost, ok = parseBertStringy(value)
125+
if !ok {
126+
log.WithField("got", value).Error("Non-string 'vhost' field")
127+
objectOk = false
128+
return false
129+
}
130+
case key == "policy":
131+
result.policy, _ = parseBertStringy(value)
132+
default:
133+
if key == "durable" {
134+
// We want to have 'durable' in 2
135+
// places: inside MetricMap (and
136+
// converted to float) and as a string
137+
// field in StatsInfo
138+
tmp, ok := parseBertStringy(value)
139+
if ok {
140+
result.durable = tmp
141+
}
142+
}
143+
if floatValue, ok := parseFloaty(value); ok {
144+
result.metrics[key] = floatValue
145+
return true
146+
}
147+
148+
// Nested structures don't need special
149+
// processing, so we fallback to generic
150+
// parser.
151+
if err := parseProplist(&result.metrics, key, value); err == nil {
152+
return true
153+
}
154+
}
155+
return true
156+
})
157+
if err == nil && objectOk {
158+
return &result, true
159+
} else {
160+
return nil, false
161+
}
162+
}
163+
164+
// parseProplist descends into an erlang data structure and stores
165+
// everything remotely resembling a float in a toMap.
166+
func parseProplist(toMap *MetricMap, basename string, maybeProplist interface{}) error {
167+
prefix := ""
168+
if basename != "" {
169+
prefix = basename + "."
170+
}
171+
return iterateBertKV(maybeProplist, func(key string, value interface{}) bool {
172+
if floatValue, ok := parseFloaty(value); ok {
173+
(*toMap)[prefix+key] = floatValue
174+
return true
175+
}
176+
177+
parseProplist(toMap, prefix+key, value) // This can fail, but we don't care
178+
return true
179+
})
180+
}
181+
182+
// assertBertSlice checks whether the provided value is something
183+
// that's represented as a slice by BERT parcer (list or tuple).
184+
func assertBertSlice(maybeSlice interface{}) ([]bert.Term, bool) {
185+
switch it := maybeSlice.(type) {
186+
case []bert.Term:
187+
return it, true
188+
default:
189+
return nil, false
190+
}
191+
}
192+
193+
// assertBertKeyedTuple checks whether the provided value looks like
194+
// an element of proplist - 2-element tuple where the first elemen is
195+
// an atom.
196+
func assertBertKeyedTuple(maybeTuple interface{}) (string, bert.Term, bool) {
197+
tuple, ok := assertBertSlice(maybeTuple)
198+
if !ok {
199+
return "", nil, false
200+
}
201+
if len(tuple) != 2 {
202+
return "", nil, false
203+
}
204+
key, ok := assertBertAtom(tuple[0])
205+
if !ok {
206+
return "", nil, false
207+
}
208+
return key, tuple[1].(bert.Term), true
209+
}
210+
211+
func assertBertAtom(val interface{}) (string, bool) {
212+
if atom, ok := val.(bert.Atom); ok {
213+
return string(atom), true
214+
}
215+
return "", false
216+
}
217+
218+
// assertBertProplistPairs checks whether the provided value points to
219+
// a proplist. Additional level of {struct, ...} wrapping can be
220+
// removed in process.
221+
func assertBertProplistPairs(maybeTaggedProplist interface{}) ([]bert.Term, bool) {
222+
terms, ok := assertBertSlice(maybeTaggedProplist)
223+
if !ok {
224+
return nil, false
225+
}
226+
227+
if len(terms) == 0 {
228+
return terms, true
229+
}
230+
231+
// Strip {struct, ...} tagging than is used to help RabbitMQ
232+
// JSON encoder
233+
key, value, ok := assertBertKeyedTuple(terms)
234+
if ok && key == "struct" {
235+
return assertBertProplistPairs(value)
236+
}
237+
238+
// Minimal safety check - at least the first element should be
239+
// a proplist pair
240+
_, _, ok = assertBertKeyedTuple(terms[0])
241+
if ok {
242+
return terms, true
243+
}
244+
return nil, false
245+
}
246+
247+
// parseFloaty tries to interpret the provided BERT value as a
248+
// float. Floats itself, integers and booleans are handled.
249+
func parseFloaty(num interface{}) (float64, bool) {
250+
switch num := num.(type) {
251+
case int:
252+
return float64(num), true
253+
case int8:
254+
return float64(num), true
255+
case int16:
256+
return float64(num), true
257+
case int32:
258+
return float64(num), true
259+
case int64:
260+
return float64(num), true
261+
case uint:
262+
return float64(num), true
263+
case uint8:
264+
return float64(num), true
265+
case uint16:
266+
return float64(num), true
267+
case uint32:
268+
return float64(num), true
269+
case uint64:
270+
return float64(num), true
271+
case float32:
272+
return float64(num), true
273+
case float64:
274+
return num, true
275+
case bert.Atom:
276+
if num == bert.TrueAtom {
277+
return 1, true
278+
} else if num == bert.FalseAtom {
279+
return 0, true
280+
}
281+
case big.Int:
282+
bigFloat := new(big.Float).SetInt(&num)
283+
result, _ := bigFloat.Float64()
284+
return result, true
285+
}
286+
return 0, false
287+
}
288+
289+
// parseBertStringy tries to extract an Erlang value that can be
290+
// represented as a Go string (binary or atom).
291+
func parseBertStringy(val interface{}) (string, bool) {
292+
if stringer, ok := val.(fmt.Stringer); ok {
293+
return stringer.String(), true
294+
} else if atom, ok := val.(bert.Atom); ok {
295+
return string(atom), true
296+
}
297+
return "", false
298+
}
299+
300+
type bertDecodeError struct {
301+
message string
302+
object interface{}
303+
}
304+
305+
func (err *bertDecodeError) Error() string {
306+
return fmt.Sprintf("%s while decoding: %s", err.message, err.object)
307+
}
308+
309+
func BertError(message string, object interface{}) error {
310+
return &bertDecodeError{message, object}
311+
}

config.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,12 @@ type rabbitCapabilitySet map[rabbitCapability]bool
4141

4242
const (
4343
rabbitCapNoSort rabbitCapability = "no_sort"
44+
rabbitCapBert rabbitCapability = "bert"
4445
)
4546

4647
var allRabbitCapabilities = rabbitCapabilitySet{
4748
rabbitCapNoSort: true,
49+
rabbitCapBert: true,
4850
}
4951

5052
func initConfig() {
@@ -53,7 +55,6 @@ func initConfig() {
5355
if valid, _ := regexp.MatchString("https?://[a-zA-Z.0-9]+", strings.ToLower(url)); valid {
5456
config.RabbitURL = url
5557
}
56-
5758
}
5859

5960
if user := os.Getenv("RABBIT_USER"); user != "" {
@@ -106,3 +107,8 @@ func parseCapabilities(raw string) rabbitCapabilitySet {
106107
}
107108
return result
108109
}
110+
111+
func isCapEnabled(config rabbitExporterConfig, cap rabbitCapability) bool {
112+
exists, enabled := config.RabbitCapabilities[cap]
113+
return exists && enabled
114+
}

config_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ func TestConfig_Capabilities(t *testing.T) {
9595
t.Error("Capability set should be empty by default")
9696
}
9797

98-
var needToSupport = []rabbitCapability{"no_sort"}
98+
var needToSupport = []rabbitCapability{"no_sort", "bert"}
9999
for _, cap := range needToSupport {
100100
os.Setenv("RABBIT_CAPABILITIES", "junk_cap, another_with_spaces_around , "+string(cap)+", done")
101101
initConfig()

0 commit comments

Comments
 (0)