Skip to content

Commit b3af12e

Browse files
author
Apurva Telang
committed
Adding namedtuples for DescribeConsumerGroup response; Adding Serialization of MemberData and MemberAssignment for the response
1 parent f9e0264 commit b3af12e

File tree

3 files changed

+63
-13
lines changed

3 files changed

+63
-13
lines changed

kafka/admin/client.py

Lines changed: 40 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
from __future__ import absolute_import
22

3-
from collections import defaultdict
3+
from collections import defaultdict, namedtuple
44
import copy
55
import logging
66
import socket
7+
from itertools import izip
78

89
from . import ConfigResourceType
910
from kafka.vendor import six
@@ -17,9 +18,11 @@
1718
from kafka.protocol.admin import (
1819
CreateTopicsRequest, DeleteTopicsRequest, DescribeConfigsRequest, AlterConfigsRequest, CreatePartitionsRequest,
1920
ListGroupsRequest, DescribeGroupsRequest, DescribeAclsRequest, CreateAclsRequest, DeleteAclsRequest)
21+
from kafka.protocol.types import Array
22+
from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment, ConsumerProtocol
2023
from kafka.protocol.commit import GroupCoordinatorRequest, OffsetFetchRequest
2124
from kafka.protocol.metadata import MetadataRequest
22-
from kafka.structs import TopicPartition, OffsetAndMetadata
25+
from kafka.structs import TopicPartition, OffsetAndMetadata, MemberInformation, GroupInformation
2326
from kafka.admin.acl_resource import ACLOperation, ACLPermissionType, ACLFilter, ACL, ResourcePattern, ResourceType, \
2427
ACLResourcePatternType
2528
from kafka.version import __version__
@@ -1000,22 +1003,47 @@ def _describe_consumer_groups_process_response(self, response):
10001003
"""Process a DescribeGroupsResponse into a group description."""
10011004
if response.API_VERSION <= 3:
10021005
assert len(response.groups) == 1
1003-
# TODO need to implement converting the response tuple into
1004-
# a more accessible interface like a namedtuple and then stop
1005-
# hardcoding tuple indices here. Several Java examples,
1006-
# including KafkaAdminClient.java
1007-
group_description = response.groups[0]
1008-
error_code = group_description[0]
1006+
for response_field, response_name in izip(response.SCHEMA.fields, response.SCHEMA.names):
1007+
if type(response_field) == Array:
1008+
described_groups = response.__dict__[response_name]
1009+
described_groups_field_schema = response_field.array_of
1010+
for described_group in described_groups:
1011+
described_group_information_list = []
1012+
is_consumer_protocol_type = False
1013+
for (described_group_information, group_information_name, group_information_field) in izip(described_group, described_groups_field_schema.names, described_groups_field_schema.fields):
1014+
if group_information_name == 'protocol_type':
1015+
protocol_type = described_group_information
1016+
is_consumer_protocol_type = (protocol_type == ConsumerProtocol.PROTOCOL_TYPE or not protocol_type)
1017+
if type(group_information_field) == Array:
1018+
member_information_list = []
1019+
member_schema = group_information_field.array_of
1020+
for members in described_group_information:
1021+
member_information = []
1022+
for (member, member_field, member_name) in izip(members, member_schema.fields, member_schema.names):
1023+
if member_name == 'member_metadata' and is_consumer_protocol_type:
1024+
member_information.append(ConsumerProtocolMemberMetadata.decode(member))
1025+
elif member_name == 'member_assignment' and is_consumer_protocol_type:
1026+
member_information.append(ConsumerProtocolMemberAssignment.decode(member))
1027+
else:
1028+
member_information.append(member)
1029+
else:
1030+
member_info_tuple = MemberInformation._make(member_information)
1031+
member_information_list.append(member_info_tuple)
1032+
else:
1033+
described_group_information_list.append(member_information_list)
1034+
else:
1035+
described_group_information_list.append(described_group_information)
1036+
else:
1037+
if response.API_VERSION <=2:
1038+
described_group_information_list.append([])
1039+
group_description = GroupInformation._make(described_group_information_list)
1040+
error_code = group_description.error_code
10091041
error_type = Errors.for_code(error_code)
10101042
# Java has the note: KAFKA-6789, we can retry based on the error code
10111043
if error_type is not Errors.NoError:
10121044
raise error_type(
10131045
"DescribeGroupsResponse failed with response '{}'."
10141046
.format(response))
1015-
# TODO Java checks the group protocol type, and if consumer
1016-
# (ConsumerProtocol.PROTOCOL_TYPE) or empty string, it decodes
1017-
# the members' partition assignments... that hasn't yet been
1018-
# implemented here so just return the raw struct results
10191047
else:
10201048
raise NotImplementedError(
10211049
"Support for DescribeGroupsResponse_v{} has not yet been added to KafkaAdminClient."

kafka/structs.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,11 @@
2020
OffsetAndTimestamp = namedtuple("OffsetAndTimestamp",
2121
["offset", "timestamp"])
2222

23+
MemberInformation = namedtuple("MemberInformation",
24+
["member_id", "client_id", "client_host", "member_metadata", "member_assignment"])
25+
26+
GroupInformation = namedtuple("GroupInformation",
27+
["error_code", "group", "state", "protocol_type", "protocol", "members", "authorized_operations"])
2328

2429
# Define retry policy for async producer
2530
# Limit value: int >= 0, 0 means no retries

test/test_admin_integration.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
from test.testutil import env_kafka_version
44

5-
from kafka.errors import NoError
5+
from kafka.errors import (NoError, GroupCoordinatorNotAvailableError)
66
from kafka.admin import (
77
ACLFilter, ACLOperation, ACLPermissionType, ResourcePattern, ResourceType, ACL, ConfigResource, ConfigResourceType)
88

@@ -138,3 +138,20 @@ def test_describe_configs_invalid_broker_id_raises(kafka_admin_client):
138138

139139
with pytest.raises(ValueError):
140140
configs = kafka_admin_client.describe_configs([ConfigResource(ConfigResourceType.BROKER, broker_id)])
141+
142+
@pytest.mark.skipif(env_kafka_version() < (0, 11), reason='Describe consumer group requires broker >=0.11')
143+
def test_describe_consumer_group_does_not_exist(kafka_admin_client):
144+
"""Tests that the describe consumer group call fails if the group coordinator is not available
145+
"""
146+
with pytest.raises(GroupCoordinatorNotAvailableError):
147+
group_description = kafka_admin_client.describe_consumer_groups(['test'])
148+
149+
@pytest.mark.skipif(env_kafka_version() < (0, 11), reason='Describe consumer group requires broker >=0.11')
150+
def test_describe_consumer_group_exists(kafka_admin_client, kafka_consumer_factory, topic):
151+
"""Tests that the describe consumer group call returns valid consumer group information
152+
"""
153+
consumer = kafka_consumer_factory(group_id='testgrp', auto_offset_reset='earliest')
154+
consumer.poll(timeout_ms=20)
155+
output = kafka_admin_client.describe_consumer_groups(['testgrp'])
156+
assert output[0].group == 'testgrp'
157+
assert output[0].members[0].member_metadata.subscription[0] == topic

0 commit comments

Comments
 (0)