Skip to content

Commit b187aac

Browse files
author
Roey Prat
committed
Implements XCLAIM
1 parent a8c998d commit b187aac

File tree

1 file changed

+59
-8
lines changed

1 file changed

+59
-8
lines changed

redis/client.py

Lines changed: 59 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -326,7 +326,6 @@ def parse_zscan(response, **options):
326326
it = iter(r)
327327
return long(cursor), list(izip(it, imap(score_cast_func, it)))
328328

329-
330329
def parse_slowlog_get(response, **options):
331330
return [{
332331
'id': item[0],
@@ -2008,17 +2007,17 @@ def xreadgroup(self, groupname, consumername, count=None, block=None,
20082007
if streams is None:
20092008
streams = {}
20102009
pieces = ['GROUP', groupname, consumername]
2011-
if block is not None:
2012-
if not isinstance(block, int) or block < 0:
2013-
raise RedisError("XREAD block must be a non-negative integer")
2014-
pieces.append("BLOCK")
2015-
pieces.append(str(block))
20162010
if count is not None:
20172011
if not isinstance(count, int) or count < 1:
2018-
raise RedisError("XREAD count must be a positive integer")
2012+
raise RedisError("XREADGROUP count must be a positive integer")
20192013
pieces.append("COUNT")
20202014
pieces.append(str(count))
2021-
2015+
if block is not None:
2016+
if not isinstance(block, int) or block < 0:
2017+
raise RedisError("XREADGROUP block must be a non-negative "
2018+
"integer")
2019+
pieces.append("BLOCK")
2020+
pieces.append(str(block))
20222021
pieces.append("STREAMS")
20232022
ids = []
20242023
for partial_stream in iteritems(streams):
@@ -2051,6 +2050,58 @@ def xpending(self, name, groupname, start=None, end=None, count=None,
20512050
pieces.append(consumername)
20522051
return self.execute_command('XPENDING', *pieces)
20532052

2053+
def xclaim(self, name, groupname, consumername, min_idle_time, message_ids,
2054+
idle=None, time=None, retrycount=None, force=False,
2055+
justid=False):
2056+
"""
2057+
Changes the ownership of a pending message.
2058+
name: name of the stream.
2059+
groupname: name of the consumer group.
2060+
consumername: name of a consumer that claims the message.
2061+
min_idle_time: filter messages that were idle less than this amount of
2062+
milliseconds
2063+
message_ids: non-empty list or tuple of message IDs to claim
2064+
idle: optional. Set the idle time (last time it was delivered) of the
2065+
message in ms
2066+
time: optional integer. This is the same as idle but instead of a
2067+
relative amount of milliseconds, it sets the idle time to a specific
2068+
Unix time (in milliseconds).
2069+
retrycount: optional integer. set the retry counter to the specified
2070+
value. This counter is incremented every time a message is delivered
2071+
again.
2072+
force: optional boolean, false by default. Creates the pending message
2073+
entry in the PEL even if certain specified IDs are not already in the
2074+
PEL assigned to a different client.
2075+
justid: optional boolean, false by default. Return just an array of IDs
2076+
of messages successfully claimed, without returning the actual message
2077+
"""
2078+
if not isinstance(min_idle_time, int) or min_idle_time < 0:
2079+
raise RedisError("XCLAIM min_idle_time must be a non negative "
2080+
"integer")
2081+
if not isinstance(message_ids, (list, tuple)) or not message_ids:
2082+
raise RedisError("XCLAIM message_ids must be a non empty list or "
2083+
"tuple of message IDs to claim")
2084+
2085+
pieces = [name, groupname, consumername, str(min_idle_time)]
2086+
pieces.extend(list(message_ids))
2087+
2088+
optional_ints = {idle: 'idle', time: 'time', retrycount: 'retrycount'}
2089+
for param, param_name in optional_ints.items():
2090+
if param is not None:
2091+
if not isinstance(param, int):
2092+
raise RedisError("XCLAIM {} must be an integer"
2093+
.format(param_name))
2094+
pieces.append(str(param))
2095+
2096+
optional_bools = {force: 'force', justid: 'justid'}
2097+
for param, param_name in optional_bools.items():
2098+
if param:
2099+
if not isinstance(param, bool):
2100+
raise RedisError("XCLAIM {} must be a boolean"
2101+
.format(param_name))
2102+
pieces.append(param_name.upper())
2103+
return self.execute_command('XCLAIM', *pieces)
2104+
20542105
# SORTED SET COMMANDS
20552106
def zadd(self, name, *args, **kwargs):
20562107
"""

0 commit comments

Comments
 (0)