Skip to content

receive_event: Make it optionally semi-nonblocking #2923

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions kernelci/api/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,16 @@ def unsubscribe_filters(self, sub_id):
self._filters.pop(sub_id)
self.api.unsubscribe(sub_id)

def receive_event_data(self, sub_id):
"""Receive CloudEvent from Pub/Sub and return its data payload"""
return self.api.receive_event(sub_id).data
def receive_event_data(self, sub_id, block=True):
"""Receive CloudEvent from Pub/Sub and return its data payload
If block is False, on receiving an "keep-alive" event,
such as "BEEP" ping, it will return None instead of the data.
Without this, it will block until an event is received.
"""
event = self.api.receive_event(sub_id, block=block)
if event is None:
return None
return event.data

def pop_event_data(self, list_name):
"""Receive CloudEvent from Redis list and return its data payload"""
Expand Down
6 changes: 5 additions & 1 deletion kernelci/api/latest.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ def unsubscribe(self, sub_id: int):
def send_event(self, channel: str, data):
self._post('/'.join(['publish', channel]), data)

def receive_event(self, sub_id: int):
def receive_event(self, sub_id: int, block: bool = True):
path = '/'.join(['listen', str(sub_id)])
while True:
resp = self._get(path)
Expand All @@ -172,6 +172,10 @@ def receive_event(self, sub_id: int):
continue
event = from_json(data)
if event.data == 'BEEP':
if not block:
# If block is False, return None
# for semi-nonblocking operation
return None
continue
return event

Expand Down