Skip to content

LT and GT support for ZADD #1509

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Jul 29, 2021
163 changes: 156 additions & 7 deletions redis/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -530,15 +530,15 @@ def parse_client_info(value):
"key1=value1 key2=value2 key3=value3"
"""
client_info = {}
infos = value.split(" ")
infos = str_if_bytes(value).split(" ")
for info in infos:
key, value = info.split("=")
client_info[key] = value

# Those fields are definded as int in networking.c
for int_key in {"id", "age", "idle", "db", "sub", "psub",
"multi", "qbuf", "qbuf-free", "obl",
"oll", "omem"}:
"argv-mem", "oll", "omem", "tot-mem"}:
client_info[int_key] = int(client_info[int_key])
return client_info

Expand All @@ -561,7 +561,7 @@ class Redis:
"""
RESPONSE_CALLBACKS = {
**string_keys_to_dict(
'AUTH EXPIRE EXPIREAT HEXISTS HMSET MOVE MSETNX PERSIST '
'AUTH COPY EXPIRE EXPIREAT HEXISTS HMSET MOVE MSETNX PERSIST '
'PSETEX RENAMENX SISMEMBER SMOVE SETEX SETNX',
bool
),
Expand Down Expand Up @@ -620,6 +620,7 @@ class Redis:
'CLIENT ID': int,
'CLIENT KILL': parse_client_kill,
'CLIENT LIST': parse_client_list,
'CLIENT INFO': parse_client_info,
'CLIENT SETNAME': bool_ok,
'CLIENT UNBLOCK': lambda r: r and int(r) == 1 or False,
'CLIENT PAUSE': bool_ok,
Expand Down Expand Up @@ -1209,14 +1210,16 @@ def client_kill(self, address):
"Disconnects the client at ``address`` (ip:port)"
return self.execute_command('CLIENT KILL', address)

def client_kill_filter(self, _id=None, _type=None, addr=None, skipme=None):
def client_kill_filter(self, _id=None, _type=None, addr=None,
skipme=None, laddr=None):
"""
Disconnects client(s) using a variety of filter options
:param id: Kills a client by its unique ID field
:param type: Kills a client by type where type is one of 'normal',
'master', 'slave' or 'pubsub'
:param addr: Kills a client by its 'address:port'
:param skipme: If True, then the client calling the command
:param laddr: Kills a cient by its 'local (bind) address:port'
will not get killed even if it is identified by one of the filter
options. If skipme is not provided, the server defaults to skipme=True
"""
Expand All @@ -1238,11 +1241,20 @@ def client_kill_filter(self, _id=None, _type=None, addr=None, skipme=None):
args.extend((b'ID', _id))
if addr is not None:
args.extend((b'ADDR', addr))
if laddr is not None:
args.extend((b'LADDR', laddr))
if not args:
raise DataError("CLIENT KILL <filter> <value> ... ... <filter> "
"<value> must specify at least one filter")
return self.execute_command('CLIENT KILL', *args)

def client_info(self):
"""
Returns information and statistics about the current
client connection.
"""
return self.execute_command('CLIENT INFO')

def client_list(self, _type=None):
"""
Returns a list of currently connected clients.
Expand Down Expand Up @@ -1292,6 +1304,12 @@ def client_pause(self, timeout):
raise DataError("CLIENT PAUSE timeout must be an integer")
return self.execute_command('CLIENT PAUSE', str(timeout))

def client_unpause(self):
"""
Unpause all redis clients
"""
return self.execute_command('CLIENT UNPAUSE')

def readwrite(self):
"Disables read queries for a connection to a Redis Cluster slave node"
return self.execute_command('READWRITE')
Expand Down Expand Up @@ -1612,6 +1630,24 @@ def bitpos(self, key, bit, start=None, end=None):
"when end is specified")
return self.execute_command('BITPOS', *params)

def copy(self, source, destination, destination_db=None, replace=False):
"""
Copy the value stored in the ``source`` key to the ``destination`` key.

``destination_db`` an alternative destination database. By default,
the ``destination`` key is created in the source Redis database.

``replace`` whether the ``destination`` key should be removed before
copying the value to it. By default, the value is not copied if
the ``destination`` key already exists.
"""
params = [source, destination]
if destination_db is not None:
params.extend(["DB", destination_db])
if replace:
params.append("REPLACE")
return self.execute_command('COPY', *params)

def decr(self, name, amount=1):
"""
Decrements the value of ``key`` by ``amount``. If no key exists,
Expand Down Expand Up @@ -1671,6 +1707,66 @@ def get(self, name):
"""
return self.execute_command('GET', name)

def getdel(self, name):
"""
Get the value at key ``name`` and delete the key. This command
is similar to GET, except for the fact that it also deletes
the key on success (if and only if the key's value type
is a string).
"""
return self.execute_command('GETDEL', name)

def getex(self, name,
ex=None, px=None, exat=None, pxat=None, persist=False):
"""
Get the value of key and optionally set its expiration.
GETEX is similar to GET, but is a write command with
additional options. All time parameters can be given as
datetime.timedelta or integers.

``ex`` sets an expire flag on key ``name`` for ``ex`` seconds.

``px`` sets an expire flag on key ``name`` for ``px`` milliseconds.

``exat`` sets an expire flag on key ``name`` for ``ex`` seconds,
specified in unix time.

``pxat`` sets an expire flag on key ``name`` for ``ex`` milliseconds,
specified in unix time.

``persist`` remove the time to live associated with ``name``.
"""

pieces = []
# similar to set command
if ex is not None:
pieces.append('EX')
if isinstance(ex, datetime.timedelta):
ex = int(ex.total_seconds())
pieces.append(ex)
if px is not None:
pieces.append('PX')
if isinstance(px, datetime.timedelta):
px = int(px.total_seconds() * 1000)
pieces.append(px)
# similar to pexpireat command
if exat is not None:
pieces.append('EXAT')
if isinstance(exat, datetime.datetime):
s = int(exat.microsecond / 1000000)
exat = int(mod_time.mktime(exat.timetuple())) + s
pieces.append(exat)
if pxat is not None:
pieces.append('PXAT')
if isinstance(pxat, datetime.datetime):
ms = int(pxat.microsecond / 1000)
pxat = int(mod_time.mktime(pxat.timetuple())) * 1000 + ms
pieces.append(pxat)
if persist:
pieces.append('PERSIST')

return self.execute_command('GETEX', name, *pieces)

def __getitem__(self, name):
"""
Return the value at key ``name``, raises a KeyError if the key
Expand Down Expand Up @@ -1802,6 +1898,26 @@ def pttl(self, name):
"Returns the number of milliseconds until the key ``name`` will expire"
return self.execute_command('PTTL', name)

def hrandfield(self, key, count=None, withvalues=False):
"""
Return a random field from the hash value stored at key.

count: if the argument is positive, return an array of distinct fields.
If called with a negative count, the behavior changes and the command
is allowed to return the same field multiple times. In this case,
the number of returned fields is the absolute value of the
specified count.
withvalues: The optional WITHVALUES modifier changes the reply so it
includes the respective values of the randomly selected hash fields.
"""
params = []
if count is not None:
params.append(count)
if withvalues:
params.append("WITHVALUES")

return self.execute_command("HRANDFIELD", key, *params)

def randomkey(self):
"Returns the name of a random key"
return self.execute_command('RANDOMKEY')
Expand Down Expand Up @@ -2434,15 +2550,16 @@ def xack(self, name, groupname, *ids):
"""
return self.execute_command('XACK', name, groupname, *ids)

def xadd(self, name, fields, id='*', maxlen=None, approximate=True):
def xadd(self, name, fields, id='*', maxlen=None, approximate=True,
nomkstream=False):
"""
Add to a stream.
name: name of the stream
fields: dict of field/value pairs to insert into the stream
id: Location to insert this record. By default it is appended.
maxlen: truncate old stream members beyond this size
approximate: actual stream length may be slightly more than maxlen

nomkstream: When set to true, do not make a stream
"""
pieces = []
if maxlen is not None:
Expand All @@ -2452,6 +2569,8 @@ def xadd(self, name, fields, id='*', maxlen=None, approximate=True):
if approximate:
pieces.append(b'~')
pieces.append(str(maxlen))
if nomkstream:
pieces.append(b'NOMKSTREAM')
pieces.append(id)
if not isinstance(fields, dict) or len(fields) == 0:
raise DataError('XADD fields must be a non-empty dict')
Expand Down Expand Up @@ -2747,7 +2866,8 @@ def xtrim(self, name, maxlen, approximate=True):
return self.execute_command('XTRIM', name, *pieces)

# SORTED SET COMMANDS
def zadd(self, name, mapping, nx=False, xx=False, ch=False, incr=False):
def zadd(self, name, mapping, nx=False, xx=False, ch=False, incr=False,
gt=None, lt=None):
"""
Set any number of element-name, score pairs to the key ``name``. Pairs
are specified as a dict of element-names keys to score values.
Expand Down Expand Up @@ -2778,6 +2898,9 @@ def zadd(self, name, mapping, nx=False, xx=False, ch=False, incr=False):
if incr and len(mapping) != 1:
raise DataError("ZADD option 'incr' only works when passing a "
"single element/score pair")
if nx is True and (gt is not None or lt is not None):
raise DataError("Only one of 'nx', 'lt', or 'gr' may be defined.")

pieces = []
options = {}
if nx:
Expand All @@ -2789,6 +2912,10 @@ def zadd(self, name, mapping, nx=False, xx=False, ch=False, incr=False):
if incr:
pieces.append(b'INCR')
options['as_score'] = True
if gt:
pieces.append(b'GT')
if lt:
pieces.append(b'LT')
for pair in mapping.items():
pieces.append(pair[1])
pieces.append(pair[0])
Expand Down Expand Up @@ -2846,6 +2973,28 @@ def zpopmin(self, name, count=None):
}
return self.execute_command('ZPOPMIN', name, *args, **options)

def zrandmember(self, key, count=None, withscores=False):
"""
Return a random element from the sorted set value stored at key.

``count`` if the argument is positive, return an array of distinct
fields. If called with a negative count, the behavior changes and
the command is allowed to return the same field multiple times.
In this case, the number of returned fields is the absolute value
of the specified count.

``withscores`` The optional WITHSCORES modifier changes the reply so it
includes the respective scores of the randomly selected elements from
the sorted set.
"""
params = []
if count is not None:
params.append(count)
if withscores:
params.append("WITHSCORES")

return self.execute_command("ZRANDMEMBER", key, *params)

def bzpopmax(self, keys, timeout=0):
"""
ZPOPMAX a value off of the first non-empty sorted set
Expand Down
Loading