Skip to content

Commit 3ce2979

Browse files
feat: support describegroups v5 (#1421)
* feat: support describegroups v5 * add basic unit tests * add v2 to test
1 parent 3f68080 commit 3ce2979

2 files changed

Lines changed: 213 additions & 17 deletions

File tree

protocol/describegroups/describegroups.go

Lines changed: 29 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,11 @@ func init() {
1010

1111
// Detailed API definition: https://kafka.apache.org/protocol#The_Messages_DescribeGroups
1212
type Request struct {
13-
Groups []string `kafka:"min=v0,max=v4"`
14-
IncludeAuthorizedOperations bool `kafka:"min=v3,max=v4"`
13+
// We need at least one tagged field to indicate that this is a "flexible" message
14+
// type.
15+
_ struct{} `kafka:"min=v5,max=v5,tag"`
16+
Groups []string `kafka:"min=v0,max=v4|min=v5,max=v5,compact"`
17+
IncludeAuthorizedOperations bool `kafka:"min=v3,max=v5"`
1518
}
1619

1720
func (r *Request) ApiKey() protocol.ApiKey { return protocol.DescribeGroups }
@@ -42,27 +45,36 @@ func (r *Request) Split(cluster protocol.Cluster) (
4245
}
4346

4447
type Response struct {
45-
ThrottleTimeMs int32 `kafka:"min=v1,max=v4"`
46-
Groups []ResponseGroup `kafka:"min=v0,max=v4"`
48+
// We need at least one tagged field to indicate that this is a "flexible" message
49+
// type.
50+
_ struct{} `kafka:"min=v5,max=v5,tag"`
51+
ThrottleTimeMs int32 `kafka:"min=v1,max=v5"`
52+
Groups []ResponseGroup `kafka:"min=v0,max=v5"`
4753
}
4854

4955
type ResponseGroup struct {
50-
ErrorCode int16 `kafka:"min=v0,max=v4"`
51-
GroupID string `kafka:"min=v0,max=v4"`
52-
GroupState string `kafka:"min=v0,max=v4"`
53-
ProtocolType string `kafka:"min=v0,max=v4"`
54-
ProtocolData string `kafka:"min=v0,max=v4"`
55-
Members []ResponseGroupMember `kafka:"min=v0,max=v4"`
56-
AuthorizedOperations int32 `kafka:"min=v3,max=v4"`
56+
// We need at least one tagged field to indicate that this is a "flexible" message
57+
// type.
58+
_ struct{} `kafka:"min=v5,max=v5,tag"`
59+
ErrorCode int16 `kafka:"min=v0,max=v5"`
60+
GroupID string `kafka:"min=v0,max=v4|min=v5,max=v5,compact"`
61+
GroupState string `kafka:"min=v0,max=v4|min=v5,max=v5,compact"`
62+
ProtocolType string `kafka:"min=v0,max=v4|min=v5,max=v5,compact"`
63+
ProtocolData string `kafka:"min=v0,max=v4|min=v5,max=v5,compact"`
64+
Members []ResponseGroupMember `kafka:"min=v0,max=v4|min=v5,max=v5,compact"`
65+
AuthorizedOperations int32 `kafka:"min=v3,max=v5"`
5766
}
5867

5968
type ResponseGroupMember struct {
60-
MemberID string `kafka:"min=v0,max=v4"`
61-
GroupInstanceID string `kafka:"min=v4,max=v4,nullable"`
62-
ClientID string `kafka:"min=v0,max=v4"`
63-
ClientHost string `kafka:"min=v0,max=v4"`
64-
MemberMetadata []byte `kafka:"min=v0,max=v4"`
65-
MemberAssignment []byte `kafka:"min=v0,max=v4"`
69+
// We need at least one tagged field to indicate that this is a "flexible" message
70+
// type.
71+
_ struct{} `kafka:"min=v5,max=v5,tag"`
72+
MemberID string `kafka:"min=v0,max=v4|min=v5,max=v5,compact"`
73+
GroupInstanceID string `kafka:"min=v4,max=v4,nullable|min=v5,max=v5,compact,nullable"`
74+
ClientID string `kafka:"min=v0,max=v4|min=v5,max=v5,compact"`
75+
ClientHost string `kafka:"min=v0,max=v4|min=v5,max=v5,compact"`
76+
MemberMetadata []byte `kafka:"min=v0,max=v4|min=v5,max=v5,compact"`
77+
MemberAssignment []byte `kafka:"min=v0,max=v4|min=v5,max=v5,compact"`
6678
}
6779

6880
func (r *Response) ApiKey() protocol.ApiKey { return protocol.DescribeGroups }
Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
package describegroups_test
2+
3+
import (
4+
"testing"
5+
6+
"github.com/segmentio/kafka-go/protocol/describegroups"
7+
"github.com/segmentio/kafka-go/protocol/prototest"
8+
)
9+
10+
const (
11+
v0 = 0
12+
v1 = 1
13+
v2 = 2
14+
v3 = 3
15+
v4 = 4
16+
v5 = 5
17+
)
18+
19+
func TestDescribeGroupsRequest(t *testing.T) {
20+
prototest.TestRequest(t, v0, &describegroups.Request{
21+
Groups: []string{"test-group"},
22+
})
23+
24+
prototest.TestRequest(t, v1, &describegroups.Request{
25+
Groups: []string{"test-group"},
26+
})
27+
28+
prototest.TestRequest(t, v2, &describegroups.Request{
29+
Groups: []string{"test-group"},
30+
})
31+
32+
prototest.TestRequest(t, v3, &describegroups.Request{
33+
Groups: []string{"test-group"},
34+
IncludeAuthorizedOperations: true,
35+
})
36+
37+
prototest.TestRequest(t, v4, &describegroups.Request{
38+
Groups: []string{"test-group"},
39+
IncludeAuthorizedOperations: true,
40+
})
41+
42+
prototest.TestRequest(t, v5, &describegroups.Request{
43+
Groups: []string{"test-group"},
44+
IncludeAuthorizedOperations: true,
45+
})
46+
}
47+
48+
func TestDescribeGroupsResponse(t *testing.T) {
49+
prototest.TestResponse(t, v0, &describegroups.Response{
50+
Groups: []describegroups.ResponseGroup{
51+
{
52+
ErrorCode: 0,
53+
GroupID: "test-group",
54+
GroupState: "Stable",
55+
ProtocolType: "consumer",
56+
ProtocolData: "range",
57+
Members: []describegroups.ResponseGroupMember{
58+
{
59+
MemberID: "consumer-1",
60+
ClientID: "client-1",
61+
ClientHost: "/127.0.0.1",
62+
MemberMetadata: []byte{0x00, 0x01},
63+
MemberAssignment: []byte{0x00, 0x02},
64+
},
65+
},
66+
},
67+
},
68+
})
69+
70+
prototest.TestResponse(t, v1, &describegroups.Response{
71+
ThrottleTimeMs: 100,
72+
Groups: []describegroups.ResponseGroup{
73+
{
74+
ErrorCode: 0,
75+
GroupID: "test-group",
76+
GroupState: "Stable",
77+
ProtocolType: "consumer",
78+
ProtocolData: "range",
79+
Members: []describegroups.ResponseGroupMember{
80+
{
81+
MemberID: "consumer-1",
82+
ClientID: "client-1",
83+
ClientHost: "/127.0.0.1",
84+
MemberMetadata: []byte{0x00, 0x01},
85+
MemberAssignment: []byte{0x00, 0x02},
86+
},
87+
},
88+
},
89+
},
90+
})
91+
92+
prototest.TestResponse(t, v2, &describegroups.Response{
93+
ThrottleTimeMs: 100,
94+
Groups: []describegroups.ResponseGroup{
95+
{
96+
ErrorCode: 0,
97+
GroupID: "test-group",
98+
GroupState: "Stable",
99+
ProtocolType: "consumer",
100+
ProtocolData: "range",
101+
Members: []describegroups.ResponseGroupMember{
102+
{
103+
MemberID: "consumer-1",
104+
ClientID: "client-1",
105+
ClientHost: "/127.0.0.1",
106+
MemberMetadata: []byte{0x00, 0x01},
107+
MemberAssignment: []byte{0x00, 0x02},
108+
},
109+
},
110+
},
111+
},
112+
})
113+
114+
prototest.TestResponse(t, v3, &describegroups.Response{
115+
ThrottleTimeMs: 100,
116+
Groups: []describegroups.ResponseGroup{
117+
{
118+
ErrorCode: 0,
119+
GroupID: "test-group",
120+
GroupState: "Stable",
121+
ProtocolType: "consumer",
122+
ProtocolData: "range",
123+
AuthorizedOperations: 2147483647,
124+
Members: []describegroups.ResponseGroupMember{
125+
{
126+
MemberID: "consumer-1",
127+
ClientID: "client-1",
128+
ClientHost: "/127.0.0.1",
129+
MemberMetadata: []byte{0x00, 0x01},
130+
MemberAssignment: []byte{0x00, 0x02},
131+
},
132+
},
133+
},
134+
},
135+
})
136+
137+
prototest.TestResponse(t, v4, &describegroups.Response{
138+
ThrottleTimeMs: 100,
139+
Groups: []describegroups.ResponseGroup{
140+
{
141+
ErrorCode: 0,
142+
GroupID: "test-group",
143+
GroupState: "Stable",
144+
ProtocolType: "consumer",
145+
ProtocolData: "range",
146+
AuthorizedOperations: 2147483647,
147+
Members: []describegroups.ResponseGroupMember{
148+
{
149+
MemberID: "consumer-1",
150+
GroupInstanceID: "instance-1",
151+
ClientID: "client-1",
152+
ClientHost: "/127.0.0.1",
153+
MemberMetadata: []byte{0x00, 0x01},
154+
MemberAssignment: []byte{0x00, 0x02},
155+
},
156+
},
157+
},
158+
},
159+
})
160+
161+
prototest.TestResponse(t, v5, &describegroups.Response{
162+
ThrottleTimeMs: 100,
163+
Groups: []describegroups.ResponseGroup{
164+
{
165+
ErrorCode: 0,
166+
GroupID: "test-group",
167+
GroupState: "Stable",
168+
ProtocolType: "consumer",
169+
ProtocolData: "range",
170+
AuthorizedOperations: 2147483647,
171+
Members: []describegroups.ResponseGroupMember{
172+
{
173+
MemberID: "consumer-1",
174+
GroupInstanceID: "instance-1",
175+
ClientID: "client-1",
176+
ClientHost: "/127.0.0.1",
177+
MemberMetadata: []byte{0x00, 0x01},
178+
MemberAssignment: []byte{0x00, 0x02},
179+
},
180+
},
181+
},
182+
},
183+
})
184+
}

0 commit comments

Comments
 (0)