From baf69901c165c994e6a7b3bacdcdd20709545f07 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Tue, 27 May 2025 13:37:45 -0700 Subject: [PATCH 1/3] Add synchronized decorator; add lock to subscription state --- kafka/consumer/subscription_state.py | 37 ++++++++++++++++++++++++++-- kafka/util.py | 7 ++++++ 2 files changed, 42 insertions(+), 2 deletions(-) diff --git a/kafka/consumer/subscription_state.py b/kafka/consumer/subscription_state.py index cc3675b1d..2cec7d465 100644 --- a/kafka/consumer/subscription_state.py +++ b/kafka/consumer/subscription_state.py @@ -15,6 +15,7 @@ import logging import random import re +import threading import time from kafka.vendor import six @@ -22,7 +23,7 @@ import kafka.errors as Errors from kafka.protocol.list_offsets import OffsetResetStrategy from kafka.structs import OffsetAndMetadata -from kafka.util import ensure_valid_topic_name +from kafka.util import ensure_valid_topic_name, synchronized log = logging.getLogger(__name__) @@ -84,6 +85,7 @@ def __init__(self, offset_reset_strategy='earliest'): self.assignment = OrderedDict() self.rebalance_listener = None self.listeners = [] + self._lock = threading.RLock() def _set_subscription_type(self, subscription_type): if not isinstance(subscription_type, SubscriptionType): @@ -93,6 +95,7 @@ def _set_subscription_type(self, subscription_type): elif self.subscription_type != subscription_type: raise Errors.IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE) + @synchronized def subscribe(self, topics=(), pattern=None, listener=None): """Subscribe to a list of topics, or a topic regex pattern. @@ -147,6 +150,7 @@ def subscribe(self, topics=(), pattern=None, listener=None): raise TypeError('listener must be a ConsumerRebalanceListener') self.rebalance_listener = listener + @synchronized def change_subscription(self, topics): """Change the topic subscription. @@ -178,6 +182,7 @@ def change_subscription(self, topics): self.subscription = set(topics) self._group_subscription.update(topics) + @synchronized def group_subscribe(self, topics): """Add topics to the current group subscription. @@ -191,6 +196,7 @@ def group_subscribe(self, topics): raise Errors.IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE) self._group_subscription.update(topics) + @synchronized def reset_group_subscription(self): """Reset the group's subscription to only contain topics subscribed by this consumer.""" if not self.partitions_auto_assigned(): @@ -198,6 +204,7 @@ def reset_group_subscription(self): assert self.subscription is not None, 'Subscription required' self._group_subscription.intersection_update(self.subscription) + @synchronized def assign_from_user(self, partitions): """Manually assign a list of TopicPartitions to this consumer. @@ -222,6 +229,7 @@ def assign_from_user(self, partitions): self._set_assignment({partition: self.assignment.get(partition, TopicPartitionState()) for partition in partitions}) + @synchronized def assign_from_subscribed(self, assignments): """Update the assignment to the specified partitions @@ -258,6 +266,7 @@ def _set_assignment(self, partition_states, randomize=False): for tp in topic_partitions[topic]: self.assignment[tp] = partition_states[tp] + @synchronized def unsubscribe(self): """Clear all topic subscriptions and partition assignments""" self.subscription = None @@ -266,6 +275,7 @@ def unsubscribe(self): self.subscribed_pattern = None self.subscription_type = SubscriptionType.NONE + @synchronized def group_subscription(self): """Get the topic subscription for the group. @@ -281,6 +291,7 @@ def group_subscription(self): """ return self._group_subscription + @synchronized def seek(self, partition, offset): """Manually specify the fetch offset for a TopicPartition. @@ -298,15 +309,18 @@ def seek(self, partition, offset): raise TypeError("offset must be type in or OffsetAndMetadata") self.assignment[partition].seek(offset) + @synchronized def assigned_partitions(self): """Return set of TopicPartitions in current assignment.""" return set(self.assignment.keys()) + @synchronized def paused_partitions(self): """Return current set of paused TopicPartitions.""" return set(partition for partition in self.assignment if self.is_paused(partition)) + @synchronized def fetchable_partitions(self): """Return ordered list of TopicPartitions that should be Fetched.""" fetchable = list() @@ -315,10 +329,12 @@ def fetchable_partitions(self): fetchable.append(partition) return fetchable + @synchronized def partitions_auto_assigned(self): """Return True unless user supplied partitions manually.""" return self.subscription_type in (SubscriptionType.AUTO_TOPICS, SubscriptionType.AUTO_PATTERN) + @synchronized def all_consumed_offsets(self): """Returns consumed offsets as {TopicPartition: OffsetAndMetadata}""" all_consumed = {} @@ -327,6 +343,7 @@ def all_consumed_offsets(self): all_consumed[partition] = state.position return all_consumed + @synchronized def request_offset_reset(self, partition, offset_reset_strategy=None): """Mark partition for offset reset using specified or default strategy. @@ -338,23 +355,28 @@ def request_offset_reset(self, partition, offset_reset_strategy=None): offset_reset_strategy = self._default_offset_reset_strategy self.assignment[partition].reset(offset_reset_strategy) + @synchronized def set_reset_pending(self, partitions, next_allowed_reset_time): for partition in partitions: self.assignment[partition].set_reset_pending(next_allowed_reset_time) + @synchronized def has_default_offset_reset_policy(self): """Return True if default offset reset policy is Earliest or Latest""" return self._default_offset_reset_strategy != OffsetResetStrategy.NONE + @synchronized def is_offset_reset_needed(self, partition): return self.assignment[partition].awaiting_reset + @synchronized def has_all_fetch_positions(self): for state in six.itervalues(self.assignment): if not state.has_valid_position: return False return True + @synchronized def missing_fetch_positions(self): missing = set() for partition, state in six.iteritems(self.assignment): @@ -362,9 +384,11 @@ def missing_fetch_positions(self): missing.add(partition) return missing + @synchronized def has_valid_position(self, partition): return partition in self.assignment and self.assignment[partition].has_valid_position + @synchronized def reset_missing_positions(self): partitions_with_no_offsets = set() for tp, state in six.iteritems(self.assignment): @@ -377,6 +401,7 @@ def reset_missing_positions(self): if partitions_with_no_offsets: raise Errors.NoOffsetForPartitionError(partitions_with_no_offsets) + @synchronized def partitions_needing_reset(self): partitions = set() for tp, state in six.iteritems(self.assignment): @@ -384,25 +409,32 @@ def partitions_needing_reset(self): partitions.add(tp) return partitions + @synchronized def is_assigned(self, partition): return partition in self.assignment + @synchronized def is_paused(self, partition): return partition in self.assignment and self.assignment[partition].paused + @synchronized def is_fetchable(self, partition): return partition in self.assignment and self.assignment[partition].is_fetchable() + @synchronized def pause(self, partition): self.assignment[partition].pause() + @synchronized def resume(self, partition): self.assignment[partition].resume() + @synchronized def reset_failed(self, partitions, next_retry_time): for partition in partitions: self.assignment[partition].reset_failed(next_retry_time) + @synchronized def move_partition_to_end(self, partition): if partition in self.assignment: try: @@ -411,6 +443,7 @@ def move_partition_to_end(self, partition): state = self.assignment.pop(partition) self.assignment[partition] = state + @synchronized def position(self, partition): return self.assignment[partition].position @@ -441,7 +474,7 @@ def reset(self, strategy): self.next_allowed_retry_time = None def is_reset_allowed(self): - return self.next_allowed_retry_time is None or self.next_allowed_retry_time < time.time() + return (self.next_allowed_retry_time or -1) < time.time() @property def awaiting_reset(self): diff --git a/kafka/util.py b/kafka/util.py index bfb9365ad..81858a49b 100644 --- a/kafka/util.py +++ b/kafka/util.py @@ -129,3 +129,10 @@ class Dict(dict): See: https://docs.python.org/2/library/weakref.html """ pass + + +def synchronized(func): + def wrapper(self, *args, **kwargs): + with self._lock: + return func(self, *args, **kwargs) + return wrapper From 5e125fa6866068527604856b594a3ac13165436b Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Tue, 27 May 2025 13:39:27 -0700 Subject: [PATCH 2/3] revert is_reset_allowed change --- kafka/consumer/subscription_state.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/consumer/subscription_state.py b/kafka/consumer/subscription_state.py index 2cec7d465..f99f01615 100644 --- a/kafka/consumer/subscription_state.py +++ b/kafka/consumer/subscription_state.py @@ -474,7 +474,7 @@ def reset(self, strategy): self.next_allowed_retry_time = None def is_reset_allowed(self): - return (self.next_allowed_retry_time or -1) < time.time() + return self.next_allowed_retry_time is None or self.next_allowed_retry_time < time.time() @property def awaiting_reset(self): From cb4dd7765f93ec694d9fe6cdc23aaacca0a380e4 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 1 Jun 2025 07:10:53 -0700 Subject: [PATCH 3/3] update_wrapper --- kafka/util.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/kafka/util.py b/kafka/util.py index 81858a49b..658c17d59 100644 --- a/kafka/util.py +++ b/kafka/util.py @@ -1,6 +1,7 @@ from __future__ import absolute_import, division import binascii +import functools import re import time import weakref @@ -135,4 +136,5 @@ def synchronized(func): def wrapper(self, *args, **kwargs): with self._lock: return func(self, *args, **kwargs) + functools.update_wrapper(wrapper, func) return wrapper