diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 852157811..b128e5548 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -250,6 +250,11 @@ def coordinator(self): else: return self.coordinator_id + def connected(self): + """Return True iff the coordinator node is connected""" + with self._lock: + return self.coordinator_id is not None and self._client.connected(self.coordinator_id) + def ensure_coordinator_ready(self, timeout_ms=None): """Block until the coordinator for this group is known. @@ -1058,28 +1063,28 @@ def _run_once(self): self.coordinator._client._lock.acquire() self.coordinator._lock.acquire() try: - if self.enabled and self.coordinator.state is MemberState.STABLE: - # TODO: When consumer.wakeup() is implemented, we need to - # disable here to prevent propagating an exception to this - # heartbeat thread - # must get client._lock, or maybe deadlock at heartbeat - # failure callback in consumer poll - self.coordinator._client.poll(timeout_ms=0) - if not self.enabled: heartbeat_log.debug('Heartbeat disabled. Waiting') self.coordinator._client._lock.release() self.coordinator._lock.wait() - heartbeat_log.debug('Heartbeat re-enabled.') + if self.enabled: + heartbeat_log.debug('Heartbeat re-enabled.') + return - elif self.coordinator.state is not MemberState.STABLE: + if self.coordinator.state is not MemberState.STABLE: # the group is not stable (perhaps because we left the # group or because the coordinator kicked us out), so # disable heartbeats and wait for the main thread to rejoin. heartbeat_log.debug('Group state is not stable, disabling heartbeats') self.disable() + return + + # TODO: When consumer.wakeup() is implemented, we need to + # disable here to prevent propagating an exception to this + # heartbeat thread + self.coordinator._client.poll(timeout_ms=0) - elif self.coordinator.coordinator_unknown(): + if self.coordinator.coordinator_unknown(): future = self.coordinator.lookup_coordinator() if not future.is_done or future.failed(): # the immediate future check ensures that we backoff @@ -1088,6 +1093,10 @@ def _run_once(self): self.coordinator._client._lock.release() self.coordinator._lock.wait(self.coordinator.config['retry_backoff_ms'] / 1000) + elif not self.coordinator.connected(): + self.coordinator._client._lock.release() + self.coordinator._lock.wait(self.coordinator.config['retry_backoff_ms'] / 1000) + elif self.coordinator.heartbeat.session_timeout_expired(): # the session timeout has expired without seeing a # successful heartbeat, so we should probably make sure @@ -1103,11 +1112,10 @@ def _run_once(self): self.coordinator.maybe_leave_group() elif not self.coordinator.heartbeat.should_heartbeat(): - # poll again after waiting for the retry backoff in case - # the heartbeat failed or the coordinator disconnected - heartbeat_log.log(0, 'Not ready to heartbeat, waiting') + next_hb = self.coordinator.heartbeat.time_to_next_heartbeat() + heartbeat_log.debug('Waiting %0.1f secs to send next heartbeat', next_hb) self.coordinator._client._lock.release() - self.coordinator._lock.wait(self.coordinator.config['retry_backoff_ms'] / 1000) + self.coordinator._lock.wait(next_hb) else: self.coordinator.heartbeat.sent_heartbeat() diff --git a/test/test_coordinator.py b/test/test_coordinator.py index 251de566a..4ffe1d28c 100644 --- a/test/test_coordinator.py +++ b/test/test_coordinator.py @@ -658,6 +658,7 @@ def test_heartbeat(mocker, patched_coord): heartbeat.enable() patched_coord.state = MemberState.STABLE mocker.spy(patched_coord, '_send_heartbeat_request') + mocker.patch.object(patched_coord, 'connected', return_value=True) mocker.patch.object(patched_coord.heartbeat, 'should_heartbeat', return_value=True) heartbeat._run_once() assert patched_coord._send_heartbeat_request.call_count == 1