diff --git a/lib/redis.rb b/lib/redis.rb index 264c436fe..4d8269494 100644 --- a/lib/redis.rb +++ b/lib/redis.rb @@ -34,12 +34,15 @@ def self.current=(redis) # @option options [Array String, Integer}>] :cluster List of cluster nodes to contact # @option options [Boolean] :replica Whether to use readonly replica nodes in Redis Cluster or not # @option options [Class] :connector Class of custom connector + # @option options [Hash] :distributed detail options for partitioning with client side consistent hashing (e.g. :nodes, :tag, :ring) # # @return [Redis] a new client instance def initialize(options = {}) @options = options.dup + client = Client + client = Distributed::Partitioner if options.key?(:distributed) @cluster_mode = options.key?(:cluster) - client = @cluster_mode ? Cluster : Client + client = Cluster if @cluster_mode @original_client = @client = client.new(options) @queue = Hash.new { |h, k| h[k] = [] } @@ -3500,5 +3503,6 @@ def _xread(args, keys, ids, blocking_timeout_msec) require_relative "redis/connection" require_relative "redis/client" require_relative "redis/cluster" +require_relative 'redis/distributed/partitioner' require_relative "redis/pipeline" require_relative "redis/subscribe" diff --git a/lib/redis/distributed.rb b/lib/redis/distributed.rb index 7093fb00a..9a7c0c37b 100644 --- a/lib/redis/distributed.rb +++ b/lib/redis/distributed.rb @@ -1,893 +1,32 @@ -require_relative "hash_ring" +# frozen_string_literal: true -class Redis - class Distributed - - class CannotDistribute < RuntimeError - def initialize(command) - @command = command - end +require_relative '../redis' - def message - "#{@command.to_s.upcase} cannot be used in Redis::Distributed because the keys involved need to be on the same server or because we cannot guarantee that the operation will be atomic." - end - end - - attr_reader :ring +class Redis + # For backward compatibility + HashRing = Distributed::HashRing + # Partitioning with client side consistent hashing + class Distributed + # @param node_configs [Array] List of nodes to contact + # @param options [Hash] same as Redis constructor + # @deprecated Use `Redis.new(distributed: { nodes: node_configs, tag: tag, ring: ring }, timeout: 10)` instead. def initialize(node_configs, options = {}) - @tag = options[:tag] || /^\{(.+?)\}/ - @ring = options[:ring] || HashRing.new - @node_configs = node_configs.dup - @default_options = options.dup - node_configs.each { |node_config| add_node(node_config) } - @subscribed_node = nil - end - - def node_for(key) - @ring.get_node(key_tag(key.to_s) || key.to_s) - end - - def nodes - @ring.nodes - end - - def add_node(options) - options = { :url => options } if options.is_a?(String) - options = @default_options.merge(options) - @ring.add_node Redis.new( options ) - end - - # Change the selected database for the current connection. - def select(db) - on_each_node :select, db - end - - # Ping the server. - def ping - on_each_node :ping - end - - # Echo the given string. - def echo(value) - on_each_node :echo, value - end - - # Close the connection. - def quit - on_each_node :quit - end - - # Asynchronously save the dataset to disk. - def bgsave - on_each_node :bgsave - end - - # Return the number of keys in the selected database. - def dbsize - on_each_node :dbsize - end - - # Remove all keys from all databases. - def flushall - on_each_node :flushall - end - - # Remove all keys from the current database. - def flushdb - on_each_node :flushdb - end - - # Get information and statistics about the server. - def info(cmd = nil) - on_each_node :info, cmd - end - - # Get the UNIX time stamp of the last successful save to disk. - def lastsave - on_each_node :lastsave - end - - # Listen for all requests received by the server in real time. - def monitor - raise NotImplementedError - end - - # Synchronously save the dataset to disk. - def save - on_each_node :save - end - - # Get server time: an UNIX timestamp and the elapsed microseconds in the current second. - def time - on_each_node :time - end - - # Remove the expiration from a key. - def persist(key) - node_for(key).persist(key) - end - - # Set a key's time to live in seconds. - def expire(key, seconds) - node_for(key).expire(key, seconds) - end - - # Set the expiration for a key as a UNIX timestamp. - def expireat(key, unix_time) - node_for(key).expireat(key, unix_time) - end - - # Get the time to live (in seconds) for a key. - def ttl(key) - node_for(key).ttl(key) - end - - # Set a key's time to live in milliseconds. - def pexpire(key, milliseconds) - node_for(key).pexpire(key, milliseconds) - end - - # Set the expiration for a key as number of milliseconds from UNIX Epoch. - def pexpireat(key, ms_unix_time) - node_for(key).pexpireat(key, ms_unix_time) - end - - # Get the time to live (in milliseconds) for a key. - def pttl(key) - node_for(key).pttl(key) - end - - # Return a serialized version of the value stored at a key. - def dump(key) - node_for(key).dump(key) - end - - # Create a key using the serialized value, previously obtained using DUMP. - def restore(key, ttl, serialized_value, options = {}) - node_for(key).restore(key, ttl, serialized_value, options) - end - - # Transfer a key from the connected instance to another instance. - def migrate(key, options) - raise CannotDistribute, :migrate - end - - # Delete a key. - def del(*args) - keys_per_node = args.group_by { |key| node_for(key) } - keys_per_node.inject(0) do |sum, (node, keys)| - sum + node.del(*keys) - end - end - - # Unlink keys. - def unlink(*args) - keys_per_node = args.group_by { |key| node_for(key) } - keys_per_node.inject(0) do |sum, (node, keys)| - sum + node.unlink(*keys) - end - end - - # Determine if a key exists. - def exists(key) - node_for(key).exists(key) - end - - # Find all keys matching the given pattern. - def keys(glob = "*") - on_each_node(:keys, glob).flatten - end - - # Move a key to another database. - def move(key, db) - node_for(key).move(key, db) - end - - # Return a random key from the keyspace. - def randomkey - raise CannotDistribute, :randomkey - end - - # Rename a key. - def rename(old_name, new_name) - ensure_same_node(:rename, [old_name, new_name]) do |node| - node.rename(old_name, new_name) - end - end - - # Rename a key, only if the new key does not exist. - def renamenx(old_name, new_name) - ensure_same_node(:renamenx, [old_name, new_name]) do |node| - node.renamenx(old_name, new_name) - end - end - - # Sort the elements in a list, set or sorted set. - def sort(key, options = {}) - keys = [key, options[:by], options[:store], *Array(options[:get])].compact - - ensure_same_node(:sort, keys) do |node| - node.sort(key, options) - end - end - - # Determine the type stored at key. - def type(key) - node_for(key).type(key) - end - - # Decrement the integer value of a key by one. - def decr(key) - node_for(key).decr(key) - end - - # Decrement the integer value of a key by the given number. - def decrby(key, decrement) - node_for(key).decrby(key, decrement) - end - - # Increment the integer value of a key by one. - def incr(key) - node_for(key).incr(key) - end - - # Increment the integer value of a key by the given integer number. - def incrby(key, increment) - node_for(key).incrby(key, increment) - end - - # Increment the numeric value of a key by the given float number. - def incrbyfloat(key, increment) - node_for(key).incrbyfloat(key, increment) - end - - # Set the string value of a key. - def set(key, value, options = {}) - node_for(key).set(key, value, options) - end - - # Set the time to live in seconds of a key. - def setex(key, ttl, value) - node_for(key).setex(key, ttl, value) - end - - # Set the time to live in milliseconds of a key. - def psetex(key, ttl, value) - node_for(key).psetex(key, ttl, value) - end - - # Set the value of a key, only if the key does not exist. - def setnx(key, value) - node_for(key).setnx(key, value) - end - - # Set multiple keys to multiple values. - def mset(*args) - raise CannotDistribute, :mset - end - - def mapped_mset(hash) - raise CannotDistribute, :mapped_mset - end - - # Set multiple keys to multiple values, only if none of the keys exist. - def msetnx(*args) - raise CannotDistribute, :msetnx - end - - def mapped_msetnx(hash) - raise CannotDistribute, :mapped_msetnx - end - - # Get the value of a key. - def get(key) - node_for(key).get(key) - end - - # Get the values of all the given keys as an Array. - def mget(*keys) - mapped_mget(*keys).values_at(*keys) - end - - # Get the values of all the given keys as a Hash. - def mapped_mget(*keys) - keys.group_by { |k| node_for k }.inject({}) do |results, (node, subkeys)| - results.merge! node.mapped_mget(*subkeys) - end - end - - # Overwrite part of a string at key starting at the specified offset. - def setrange(key, offset, value) - node_for(key).setrange(key, offset, value) - end - - # Get a substring of the string stored at a key. - def getrange(key, start, stop) - node_for(key).getrange(key, start, stop) - end - - # Sets or clears the bit at offset in the string value stored at key. - def setbit(key, offset, value) - node_for(key).setbit(key, offset, value) - end - - # Returns the bit value at offset in the string value stored at key. - def getbit(key, offset) - node_for(key).getbit(key, offset) - end - - # Append a value to a key. - def append(key, value) - node_for(key).append(key, value) - end - - # Count the number of set bits in a range of the string value stored at key. - def bitcount(key, start = 0, stop = -1) - node_for(key).bitcount(key, start, stop) - end - - # Perform a bitwise operation between strings and store the resulting string in a key. - def bitop(operation, destkey, *keys) - ensure_same_node(:bitop, [destkey] + keys) do |node| - node.bitop(operation, destkey, *keys) - end - end - - # Return the position of the first bit set to 1 or 0 in a string. - def bitpos(key, bit, start=nil, stop=nil) - node_for(key).bitpos(key, bit, start, stop) - end - - # Set the string value of a key and return its old value. - def getset(key, value) - node_for(key).getset(key, value) - end - - # Get the length of the value stored in a key. - def strlen(key) - node_for(key).strlen(key) - end - - def [](key) - get(key) - end - - def []=(key,value) - set(key, value) - end - - # Get the length of a list. - def llen(key) - node_for(key).llen(key) - end - - # Prepend one or more values to a list. - def lpush(key, value) - node_for(key).lpush(key, value) - end - - # Prepend a value to a list, only if the list exists. - def lpushx(key, value) - node_for(key).lpushx(key, value) - end - - # Append one or more values to a list. - def rpush(key, value) - node_for(key).rpush(key, value) - end - - # Append a value to a list, only if the list exists. - def rpushx(key, value) - node_for(key).rpushx(key, value) - end - - # Remove and get the first element in a list. - def lpop(key) - node_for(key).lpop(key) - end - - # Remove and get the last element in a list. - def rpop(key) - node_for(key).rpop(key) - end - - # Remove the last element in a list, append it to another list and return - # it. - def rpoplpush(source, destination) - ensure_same_node(:rpoplpush, [source, destination]) do |node| - node.rpoplpush(source, destination) - end - end - - def _bpop(cmd, args) - options = {} - - if args.last.is_a?(Hash) - options = args.pop - elsif args.last.respond_to?(:to_int) - # Issue deprecation notice in obnoxious mode... - options[:timeout] = args.pop.to_int - end - - if args.size > 1 - # Issue deprecation notice in obnoxious mode... - end - - keys = args.flatten - - ensure_same_node(cmd, keys) do |node| - node.__send__(cmd, keys, options) - end - end - - # Remove and get the first element in a list, or block until one is - # available. - def blpop(*args) - _bpop(:blpop, args) - end - - # Remove and get the last element in a list, or block until one is - # available. - def brpop(*args) - _bpop(:brpop, args) - end - - # Pop a value from a list, push it to another list and return it; or block - # until one is available. - def brpoplpush(source, destination, options = {}) - case options - when Integer - # Issue deprecation notice in obnoxious mode... - options = { :timeout => options } - end - - ensure_same_node(:brpoplpush, [source, destination]) do |node| - node.brpoplpush(source, destination, options) - end - end - - # Get an element from a list by its index. - def lindex(key, index) - node_for(key).lindex(key, index) + options = options.dup + tag = options.delete(:tag) + ring = options.delete(:ring) + options[:distributed] = { nodes: node_configs, tag: tag, ring: ring } + @redis = Redis.new(options) end - # Insert an element before or after another element in a list. - def linsert(key, where, pivot, value) - node_for(key).linsert(key, where, pivot, value) - end - - # Get a range of elements from a list. - def lrange(key, start, stop) - node_for(key).lrange(key, start, stop) - end - - # Remove elements from a list. - def lrem(key, count, value) - node_for(key).lrem(key, count, value) - end - - # Set the value of an element in a list by its index. - def lset(key, index, value) - node_for(key).lset(key, index, value) - end - - # Trim a list to the specified range. - def ltrim(key, start, stop) - node_for(key).ltrim(key, start, stop) - end - - # Get the number of members in a set. - def scard(key) - node_for(key).scard(key) - end - - # Add one or more members to a set. - def sadd(key, member) - node_for(key).sadd(key, member) - end - - # Remove one or more members from a set. - def srem(key, member) - node_for(key).srem(key, member) - end - - # Remove and return a random member from a set. - def spop(key, count = nil) - node_for(key).spop(key, count) - end - - # Get a random member from a set. - def srandmember(key, count = nil) - node_for(key).srandmember(key, count) - end - - # Move a member from one set to another. - def smove(source, destination, member) - ensure_same_node(:smove, [source, destination]) do |node| - node.smove(source, destination, member) - end - end - - # Determine if a given value is a member of a set. - def sismember(key, member) - node_for(key).sismember(key, member) - end - - # Get all the members in a set. - def smembers(key) - node_for(key).smembers(key) - end - - # Scan a set - def sscan(key, cursor, options={}) - node_for(key).sscan(key, cursor, options) - end - - # Scan a set and return an enumerator - def sscan_each(key, options={}, &block) - node_for(key).sscan_each(key, options, &block) - end - - # Subtract multiple sets. - def sdiff(*keys) - ensure_same_node(:sdiff, keys) do |node| - node.sdiff(*keys) - end - end - - # Subtract multiple sets and store the resulting set in a key. - def sdiffstore(destination, *keys) - ensure_same_node(:sdiffstore, [destination] + keys) do |node| - node.sdiffstore(destination, *keys) - end - end - - # Intersect multiple sets. - def sinter(*keys) - ensure_same_node(:sinter, keys) do |node| - node.sinter(*keys) - end - end - - # Intersect multiple sets and store the resulting set in a key. - def sinterstore(destination, *keys) - ensure_same_node(:sinterstore, [destination] + keys) do |node| - node.sinterstore(destination, *keys) - end - end - - # Add multiple sets. - def sunion(*keys) - ensure_same_node(:sunion, keys) do |node| - node.sunion(*keys) - end - end - - # Add multiple sets and store the resulting set in a key. - def sunionstore(destination, *keys) - ensure_same_node(:sunionstore, [destination] + keys) do |node| - node.sunionstore(destination, *keys) - end - end - - # Get the number of members in a sorted set. - def zcard(key) - node_for(key).zcard(key) - end - - # Add one or more members to a sorted set, or update the score for members - # that already exist. - def zadd(key, *args) - node_for(key).zadd(key, *args) - end - - # Increment the score of a member in a sorted set. - def zincrby(key, increment, member) - node_for(key).zincrby(key, increment, member) - end - - # Remove one or more members from a sorted set. - def zrem(key, member) - node_for(key).zrem(key, member) - end - - # Get the score associated with the given member in a sorted set. - def zscore(key, member) - node_for(key).zscore(key, member) - end - - # Return a range of members in a sorted set, by index. - def zrange(key, start, stop, options = {}) - node_for(key).zrange(key, start, stop, options) - end - - # Return a range of members in a sorted set, by index, with scores ordered - # from high to low. - def zrevrange(key, start, stop, options = {}) - node_for(key).zrevrange(key, start, stop, options) - end - - # Determine the index of a member in a sorted set. - def zrank(key, member) - node_for(key).zrank(key, member) - end - - # Determine the index of a member in a sorted set, with scores ordered from - # high to low. - def zrevrank(key, member) - node_for(key).zrevrank(key, member) - end - - # Remove all members in a sorted set within the given indexes. - def zremrangebyrank(key, start, stop) - node_for(key).zremrangebyrank(key, start, stop) - end - - # Return a range of members in a sorted set, by score. - def zrangebyscore(key, min, max, options = {}) - node_for(key).zrangebyscore(key, min, max, options) - end - - # Return a range of members in a sorted set, by score, with scores ordered - # from high to low. - def zrevrangebyscore(key, max, min, options = {}) - node_for(key).zrevrangebyscore(key, max, min, options) - end - - # Remove all members in a sorted set within the given scores. - def zremrangebyscore(key, min, max) - node_for(key).zremrangebyscore(key, min, max) - end - - # Get the number of members in a particular score range. - def zcount(key, min, max) - node_for(key).zcount(key, min, max) - end - - # Intersect multiple sorted sets and store the resulting sorted set in a new - # key. - def zinterstore(destination, keys, options = {}) - ensure_same_node(:zinterstore, [destination] + keys) do |node| - node.zinterstore(destination, keys, options) - end - end - - # Add multiple sorted sets and store the resulting sorted set in a new key. - def zunionstore(destination, keys, options = {}) - ensure_same_node(:zunionstore, [destination] + keys) do |node| - node.zunionstore(destination, keys, options) - end - end - - # Get the number of fields in a hash. - def hlen(key) - node_for(key).hlen(key) - end + private - # Set the string value of a hash field. - def hset(key, field, value) - node_for(key).hset(key, field, value) + def method_missing(name, *args, &block) + @redis.public_send(name, *args, &block) end - # Set the value of a hash field, only if the field does not exist. - def hsetnx(key, field, value) - node_for(key).hsetnx(key, field, value) - end - - # Set multiple hash fields to multiple values. - def hmset(key, *attrs) - node_for(key).hmset(key, *attrs) - end - - def mapped_hmset(key, hash) - node_for(key).hmset(key, *hash.to_a.flatten) - end - - # Get the value of a hash field. - def hget(key, field) - node_for(key).hget(key, field) - end - - # Get the values of all the given hash fields. - def hmget(key, *fields) - node_for(key).hmget(key, *fields) - end - - def mapped_hmget(key, *fields) - Hash[*fields.zip(hmget(key, *fields)).flatten] - end - - # Delete one or more hash fields. - def hdel(key, *fields) - node_for(key).hdel(key, *fields) - end - - # Determine if a hash field exists. - def hexists(key, field) - node_for(key).hexists(key, field) - end - - # Increment the integer value of a hash field by the given integer number. - def hincrby(key, field, increment) - node_for(key).hincrby(key, field, increment) - end - - # Increment the numeric value of a hash field by the given float number. - def hincrbyfloat(key, field, increment) - node_for(key).hincrbyfloat(key, field, increment) - end - - # Get all the fields in a hash. - def hkeys(key) - node_for(key).hkeys(key) - end - - # Get all the values in a hash. - def hvals(key) - node_for(key).hvals(key) - end - - # Get all the fields and values in a hash. - def hgetall(key) - node_for(key).hgetall(key) - end - - # Post a message to a channel. - def publish(channel, message) - node_for(channel).publish(channel, message) - end - - def subscribed? - !! @subscribed_node - end - - # Listen for messages published to the given channels. - def subscribe(channel, *channels, &block) - if channels.empty? - @subscribed_node = node_for(channel) - @subscribed_node.subscribe(channel, &block) - else - ensure_same_node(:subscribe, [channel] + channels) do |node| - @subscribed_node = node - node.subscribe(channel, *channels, &block) - end - end - end - - # Stop listening for messages posted to the given channels. - def unsubscribe(*channels) - raise RuntimeError, "Can't unsubscribe if not subscribed." unless subscribed? - @subscribed_node.unsubscribe(*channels) - end - - # Listen for messages published to channels matching the given patterns. - def psubscribe(*channels, &block) - raise NotImplementedError - end - - # Stop listening for messages posted to channels matching the given - # patterns. - def punsubscribe(*channels) - raise NotImplementedError - end - - # Watch the given keys to determine execution of the MULTI/EXEC block. - def watch(*keys) - raise CannotDistribute, :watch - end - - # Forget about all watched keys. - def unwatch - raise CannotDistribute, :unwatch - end - - def pipelined - raise CannotDistribute, :pipelined - end - - # Mark the start of a transaction block. - def multi - raise CannotDistribute, :multi - end - - # Execute all commands issued after MULTI. - def exec - raise CannotDistribute, :exec - end - - # Discard all commands issued after MULTI. - def discard - raise CannotDistribute, :discard - end - - # Control remote script registry. - def script(subcommand, *args) - on_each_node(:script, subcommand, *args) - end - - # Add one or more members to a HyperLogLog structure. - def pfadd(key, member) - node_for(key).pfadd(key, member) - end - - # Get the approximate cardinality of members added to HyperLogLog structure. - def pfcount(*keys) - ensure_same_node(:pfcount, keys.flatten(1)) do |node| - node.pfcount(keys) - end - end - - # Merge multiple HyperLogLog values into an unique value that will approximate the cardinality of the union of - # the observed Sets of the source HyperLogLog structures. - def pfmerge(dest_key, *source_key) - ensure_same_node(:pfmerge, [dest_key, *source_key]) do |node| - node.pfmerge(dest_key, *source_key) - end - end - - def _eval(cmd, args) - script = args.shift - options = args.pop if args.last.is_a?(Hash) - options ||= {} - - keys = args.shift || options[:keys] || [] - argv = args.shift || options[:argv] || [] - - ensure_same_node(cmd, keys) do |node| - node.send(cmd, script, keys, argv) - end - end - - # Evaluate Lua script. - def eval(*args) - _eval(:eval, args) - end - - # Evaluate Lua script by its SHA. - def evalsha(*args) - _eval(:evalsha, args) - end - - def inspect - "#" - end - - def dup - self.class.new(@node_configs, @default_options) - end - - protected - - def on_each_node(command, *args) - nodes.map do |node| - node.send(command, *args) - end - end - - def node_index_for(key) - nodes.index(node_for(key)) - end - - def key_tag(key) - key.to_s[@tag, 1] if @tag - end - - def ensure_same_node(command, keys) - all = true - - tags = keys.map do |key| - tag = key_tag(key) - all = false unless tag - tag - end - - if (all && tags.uniq.size != 1) || (!all && keys.uniq.size != 1) - # Not 1 unique tag or not 1 unique key - raise CannotDistribute, command - end - - yield(node_for(keys.first)) + def respond_to_missing?(name, include_private) + @redis.respond_to?(name, include_private) end end end diff --git a/lib/redis/distributed/hash_ring.rb b/lib/redis/distributed/hash_ring.rb new file mode 100644 index 000000000..829106ae5 --- /dev/null +++ b/lib/redis/distributed/hash_ring.rb @@ -0,0 +1,88 @@ +# frozen_string_literal: true + +require 'zlib' + +class Redis + class Distributed + # Client side consistent hashing + class HashRing + POINTS_PER_SERVER = 160 # this is the default in libmemcached + + attr_reader :nodes + + # Find the closest index in HashRing with value <= the given value + def self.binary_search(ary, value) + upper = ary.size - 1 + lower = 0 + idx = 0 + + while lower <= upper + idx = (lower + upper) / 2 + comp = ary[idx] <=> value + + return idx if comp == 0 + + if comp > 0 + upper = idx - 1 + else + lower = idx + 1 + end + end + + upper = ary.size - 1 if upper < 0 + upper + end + + # nodes is a list of objects that have a proper to_s representation. + # replicas indicates how many virtual points should be used pr. node, + # replicas are required to improve the distribution. + def initialize(nodes = [], replicas = POINTS_PER_SERVER) + @replicas = replicas + @ring = {} + @nodes = [] + @sorted_keys = [] + nodes.each { |node| add_node(node) } + end + + # Adds a `node` to the hash ring (including a number of replicas). + def add_node(node) + @nodes << node + @replicas.times do |i| + key = Zlib.crc32("#{node.id}:#{i}") + @ring[key] = node + @sorted_keys << key + end + @sorted_keys.sort! + end + + def remove_node(node) + @nodes.reject! { |n| n.id == node.id } + @replicas.times do |i| + key = Zlib.crc32("#{node.id}:#{i}") + @ring.delete(key) + @sorted_keys.reject! { |k| k == key } + end + end + + # get the node in the hash ring for this key + def get_node(key) + get_node_pos(key)[0] + end + + def get_node_pos(key) + return [nil, nil] if @ring.empty? + crc = Zlib.crc32(key) + idx = self.class.binary_search(@sorted_keys, crc) + [@ring[@sorted_keys[idx]], idx] + end + + def iter_nodes(key) + return [nil, nil] if @ring.empty? + _, pos = get_node_pos(key) + @ring.size.times do |n| + yield @ring[@sorted_keys[(pos + n) % @ring.size]] + end + end + end + end +end diff --git a/lib/redis/distributed/partitioner.rb b/lib/redis/distributed/partitioner.rb new file mode 100644 index 000000000..8c97c4889 --- /dev/null +++ b/lib/redis/distributed/partitioner.rb @@ -0,0 +1,222 @@ +# frozen_string_literal: true + +require_relative '../errors' +require_relative '../client' +require_relative 'hash_ring' + +class Redis + class Distributed + # Partitioning with client side consistent hashing + class Partitioner + def initialize(options = {}) + @options = options.dup + @tag_fmt = @options.dig(:distributed, :tag) || /^\{(.+?)\}/ + @ring = @options.dig(:distributed, :ring) || HashRing.new + node_configs = @options.dig(:distributed, :nodes) + @options.delete(:distributed) + clients = build_node_clients(node_configs, @options) + clients.each { |c| @ring.add_node(c) } + @command = build_command_info(@ring.nodes) + end + + def id + @ring.nodes.map(&:id).sort.join(', ') + end + + def db + @ring.nodes.first.db + end + + def db=(_db) + raise CannotDistribute, 'select' + end + + def timeout + @ring.nodes.first.timeout + end + + def connected? + @ring.nodes.any?(&:connected?) + end + + def disconnect + @ring.nodes.each(&:disconnect) + true + end + + def connection_info + @ring.nodes.sort_by(&:id).map do |n| + { host: n.host, port: n.port, db: n.db, id: n.id, location: n.location } + end + end + + def with_reconnect(val = true, &block) + @ring.nodes.sample.with_reconnect(val, &block) + end + + def call(command, &block) + send_command(command, &block) + end + + def call_loop(command, timeout = 0, &block) + raise(CannotDistribute, 'monitor') if command.first.to_s.casecmp('monitor').zero? + node_for(command).call_loop(command, timeout, &block) + end + + def call_pipeline(pipeline) + raise CannotDistribute, pipeline.commands.map(&:first).join(',') + end + + def call_with_timeout(command, timeout, &block) + node_for(command).call_with_timeout(command, timeout, &block) + end + + def call_without_timeout(command, &block) + call_with_timeout(command, 0, &block) + end + + def process(commands, &block) + if unsubscription_command?(commands) + @ring.nodes.map { |n| n.process(commands, &block) } + else + node_for(commands.first).process(commands, &block) + end + end + + private + + def build_node_clients(node_configs, options) + node_configs.map { |c| build_client(c, options) } + end + + def build_client(config, options) + config = config.is_a?(String) ? { url: config } : config + Client.new(options.merge(config)) + end + + def build_command_info(nodes) + details = {} + + nodes.each do |node| + details = fetch_command_details(node) + details.empty? ? next : break + end + + details + end + + def fetch_command_details(node) + node.call(%i[command]).map do |reply| + [reply[0], { arity: reply[1], flags: reply[2], first: reply[3], last: reply[4], step: reply[5] }] + end.to_h + rescue CannotConnectError, ConnectionError, CommandError + {} # can retry on another node + end + + def send_command(command, &block) + case cmd = command.first.to_s.downcase + when 'echo' then send_command_each_node(command, &block).uniq + when 'keys' then send_command_each_node(command, &block).flatten + when 'mget' then send_mget_command(command, &block) + when 'script' then send_script_command(command, &block) + when 'auth', 'bgrewriteaof', 'bgsave', 'dbsize', 'flushall', 'flushdb', + 'info', 'lastsave', 'ping', 'quit', 'role', 'save', 'time', 'wait' + send_command_each_node(command, &block) + when 'client', 'cluster', 'config', 'discard', 'exec', 'memory', + 'migrate', 'multi', 'psubscribe', 'pubsub', 'randomkey', + 'readonly', 'readwrite', 'select', 'shutdown', 'unwatch', 'watch' + raise CannotDistribute, cmd + when 'node_for' then @ring.get_node(extract_tag_if_needed(command[1])) # for backward compatibility + when 'nodes' then @ring.nodes # for backward compatibility as possible + when 'add_node' then @ring.add_node(build_client(command[1], @options)) # for backward compatibility + else node_for(command).call(command, &block) + end + end + + def send_script_command(command, &block) + case command[1].to_s.downcase + when 'debug', 'kill', 'flush', 'load' + send_command_each_node(command, &block) + else node_for(command).call(command, &block) + end + end + + def send_command_each_node(command, &block) + @ring.nodes.map { |node| node.call(command, &block) } + end + + def send_mget_command(command) + keys = extract_keys(command) + vals = keys.map { |k| [k, @ring.get_node(extract_tag_if_needed(k))] } + .group_by { |_, node| node.id } + .map { |_, pairs| [pairs[0][1], pairs.map(&:first)] } + .map { |node, ks| ks.zip(node.call(%w[mget] + ks)).to_h } + .reduce(&:merge) + .values_at(*keys) + block_given? ? yield(vals) : vals + end + + def node_for(command) + keys = extract_keys(command) + return @ring.nodes.sample if keys.empty? + + assert_same_node!(keys) + @ring.get_node(extract_tag_if_needed(keys.first)) + end + + def extract_keys(command) + cmds = command.flatten.map(&:to_s).map { |s| s.valid_encoding? ? s.downcase : s } + cmd = cmds.first + info = @command[cmd] + return [] if keyless_command?(cmd, info) + + last_pos = cmds.size - 1 + + case cmd + when 'publish' then [1] + when 'subscribe' then (1..last_pos).to_a + when 'memory' then cmds[1].casecmp('usage').zero? ? [2] : [] + when 'eval', 'evalsha' then (3..cmds[2].to_i + 2).to_a + when 'psubscribe', 'pubsub', 'punsubscribe', 'unsubscribe', 'migrate' then [] + when 'sort' + by = cmds.index('by') + store = cmds.index('store') + gets = cmds.map.with_index { |w, i| w == 'get' ? i + 1 : nil } + [1, (by ? by + 1 : nil), (store ? store + 1 : nil), *gets].compact + when 'zinterstore', 'zunionstore' + last = cmds.index('weights') || cmds.index('aggregate') || last_pos + 1 + [1] + (3..last - 1).to_a + when 'xread', 'xreadgroup' + idx = cmds.index('streams') + idx.nil? ? [] : (idx + 1..last_pos).to_a.slice(0, (last_pos - idx - 1) / 2 + 1) + else + last = info[:last] < 0 ? last_pos + info[:last] + 1 : info[:last] + range = info[:first]..last + (info[:step] < 1 ? range : range.step(info[:step])).to_a + end.map { |i| cmds[i] } + end + + def extract_tag_if_needed(key) + key.to_s.slice(@tag_fmt, 1) || key + end + + def assert_same_node!(keys) + node_ids = keys.map { |k| @ring.get_node(extract_tag_if_needed(k)).id }.uniq + raise(CannotDistribute, keys.join(',')) if node_ids.size > 1 + end + + def keyless_command?(cmd, info) + info.nil? || + (info[:first] < 1 && + (info[:flags] & %w[pubsub movablekeys]).empty? && + (%w[memory] & [cmd]).empty?) + end + + def unsubscription_command?(commands) + commands.size == 1 && + %w[unsubscribe punsubscribe].include?(commands.first.first.to_s.downcase) && + commands.first.size == 1 + end + end + end +end diff --git a/lib/redis/errors.rb b/lib/redis/errors.rb index 1483edb62..6722ac45f 100644 --- a/lib/redis/errors.rb +++ b/lib/redis/errors.rb @@ -42,6 +42,21 @@ class InheritedError < BaseConnectionError class InvalidClientOptionError < BaseError end + class Distributed + # Raised when the command is not supported in Redis::Distributed. + class CannotDistribute < RuntimeError + def initialize(command) + @command = command + end + + def message + "#{@command.to_s.upcase} cannot be used in client side partitioning "\ + 'because the keys involved need to be on the same server or '\ + 'because we cannot guarantee that the operation will be atomic.' + end + end + end + class Cluster # Raised when client connected to redis as cluster mode # and some cluster subcommands were called. diff --git a/lib/redis/hash_ring.rb b/lib/redis/hash_ring.rb deleted file mode 100644 index e1c93b25a..000000000 --- a/lib/redis/hash_ring.rb +++ /dev/null @@ -1,88 +0,0 @@ -require 'zlib' - -class Redis - class HashRing - - POINTS_PER_SERVER = 160 # this is the default in libmemcached - - attr_reader :ring, :sorted_keys, :replicas, :nodes - - # nodes is a list of objects that have a proper to_s representation. - # replicas indicates how many virtual points should be used pr. node, - # replicas are required to improve the distribution. - def initialize(nodes=[], replicas=POINTS_PER_SERVER) - @replicas = replicas - @ring = {} - @nodes = [] - @sorted_keys = [] - nodes.each do |node| - add_node(node) - end - end - - # Adds a `node` to the hash ring (including a number of replicas). - def add_node(node) - @nodes << node - @replicas.times do |i| - key = Zlib.crc32("#{node.id}:#{i}") - @ring[key] = node - @sorted_keys << key - end - @sorted_keys.sort! - end - - def remove_node(node) - @nodes.reject!{|n| n.id == node.id} - @replicas.times do |i| - key = Zlib.crc32("#{node.id}:#{i}") - @ring.delete(key) - @sorted_keys.reject! {|k| k == key} - end - end - - # get the node in the hash ring for this key - def get_node(key) - get_node_pos(key)[0] - end - - def get_node_pos(key) - return [nil,nil] if @ring.size == 0 - crc = Zlib.crc32(key) - idx = HashRing.binary_search(@sorted_keys, crc) - return [@ring[@sorted_keys[idx]], idx] - end - - def iter_nodes(key) - return [nil,nil] if @ring.size == 0 - _, pos = get_node_pos(key) - @ring.size.times do |n| - yield @ring[@sorted_keys[(pos+n) % @ring.size]] - end - end - - # Find the closest index in HashRing with value <= the given value - def self.binary_search(ary, value, &block) - upper = ary.size - 1 - lower = 0 - idx = 0 - - while(lower <= upper) do - idx = (lower + upper) / 2 - comp = ary[idx] <=> value - - if comp == 0 - return idx - elsif comp > 0 - upper = idx - 1 - else - lower = idx + 1 - end - end - - if upper < 0 - upper = ary.size - 1 - end - return upper - end - end -end diff --git a/makefile b/makefile index fff9dcaa1..fb2e581a7 100644 --- a/makefile +++ b/makefile @@ -18,6 +18,10 @@ CLUSTER_PORTS := 7000 7001 7002 7003 7004 7005 CLUSTER_PID_PATHS := $(addprefix ${TMP}/redis,$(addsuffix .pid,${CLUSTER_PORTS})) CLUSTER_CONF_PATHS := $(addprefix ${TMP}/nodes,$(addsuffix .conf,${CLUSTER_PORTS})) CLUSTER_ADDRS := $(addprefix 127.0.0.1:,${CLUSTER_PORTS}) +NODE2_PORT := 6383 +NODE2_PID_PATH := ${BUILD_DIR}/redis_node2.pid +NODE2_SOCKET_PATH := ${BUILD_DIR}/redis_node2.sock + define kill-redis (ls $1 2> /dev/null && kill $$(cat $1) && rm -f $1) || true @@ -31,6 +35,7 @@ all: start_all: make start make start_slave + make start_node2 make start_sentinel make start_cluster make create_cluster @@ -40,6 +45,7 @@ stop_all: make stop_slave make stop make stop_cluster + make stop_node2 ${TMP}: mkdir -p $@ @@ -114,8 +120,18 @@ create_cluster: yes yes | ((bundle exec ruby ${REDIS_TRIB} create --replicas 1 ${CLUSTER_ADDRS}) || \ (${REDIS_CLIENT} --cluster create ${CLUSTER_ADDRS} --cluster-replicas 1)) +stop_node2: + $(call kill-redis,${NODE2_PID_PATH}) + +start_node2: ${BINARY} + ${BINARY}\ + --daemonize yes\ + --pidfile ${NODE2_PID_PATH}\ + --port ${NODE2_PORT}\ + --unixsocket ${NODE2_SOCKET_PATH} + clean: (test -d ${BUILD_DIR} && cd ${BUILD_DIR}/src && make clean distclean) || true .PHONY: all test stop start stop_slave start_slave stop_sentinel start_sentinel\ - stop_cluster start_cluster create_cluster stop_all start_all clean + stop_cluster start_cluster create_cluster stop_node2 start_node2 stop_all start_all clean diff --git a/test/distributed_blocking_commands_test.rb b/test/distributed_blocking_commands_test.rb index 0ac67bd9e..6571fe56e 100644 --- a/test/distributed_blocking_commands_test.rb +++ b/test/distributed_blocking_commands_test.rb @@ -7,45 +7,43 @@ class TestDistributedBlockingCommands < Test::Unit::TestCase def test_blpop_raises assert_raises(Redis::Distributed::CannotDistribute) do - r.blpop(%w[foo bar]) + r.blpop(%w[key1 key4]) end end def test_blpop_raises_with_old_prototype assert_raises(Redis::Distributed::CannotDistribute) do - r.blpop('foo', 'bar', 0) + r.blpop('key1', 'key4', 0) end end def test_brpop_raises - assert_raises(Redis::Distributed::CannotDistribute) do - r.brpop(%w[foo bar]) + target_version('3.2.0') do + # There is a bug Redis 3.0's COMMAND command + assert_raises(Redis::Distributed::CannotDistribute) do + r.brpop(%w[key1 key4]) + end end end def test_brpop_raises_with_old_prototype - assert_raises(Redis::Distributed::CannotDistribute) do - r.brpop('foo', 'bar', 0) + target_version('3.2.0') do + # There is a bug Redis 3.0's COMMAND command + assert_raises(Redis::Distributed::CannotDistribute) do + r.brpop('key1', 'key4', 0) + end end end def test_brpoplpush_raises assert_raises(Redis::Distributed::CannotDistribute) do - r.brpoplpush('foo', 'bar') + r.brpoplpush('key1', 'key4') end end def test_brpoplpush_raises_with_old_prototype assert_raises(Redis::Distributed::CannotDistribute) do - r.brpoplpush('foo', 'bar', 0) + r.brpoplpush('key1', 'key4', 0) end end - - def test_bzpopmin - # Not implemented yet - end - - def test_bzpopmax - # Not implemented yet - end end diff --git a/test/distributed_commands_on_hashes_test.rb b/test/distributed_commands_on_hashes_test.rb index 9ec2902d6..55265593f 100644 --- a/test/distributed_commands_on_hashes_test.rb +++ b/test/distributed_commands_on_hashes_test.rb @@ -5,14 +5,6 @@ class TestDistributedCommandsOnHashes < Test::Unit::TestCase include Helper::Distributed include Lint::Hashes - def test_hscan - # Not implemented yet - end - - def test_hstrlen - # Not implemented yet - end - def test_mapped_hmget_in_a_pipeline_returns_hash assert_raise(Redis::Distributed::CannotDistribute) do super diff --git a/test/distributed_commands_on_hyper_log_log_test.rb b/test/distributed_commands_on_hyper_log_log_test.rb index b63cfb9e9..30abbc3fd 100644 --- a/test/distributed_commands_on_hyper_log_log_test.rb +++ b/test/distributed_commands_on_hyper_log_log_test.rb @@ -16,10 +16,10 @@ def test_pfmerge def test_pfcount_multiple_keys_diff_nodes target_version '2.8.9' do assert_raise Redis::Distributed::CannotDistribute do - r.pfadd 'foo', 's1' - r.pfadd 'bar', 's2' + r.pfadd 'key1', 's1' + r.pfadd 'key4', 's2' - assert r.pfcount('res', 'foo', 'bar') + assert r.pfcount('res', 'key1', 'key4') end end end diff --git a/test/distributed_commands_on_lists_test.rb b/test/distributed_commands_on_lists_test.rb index fa5caf92e..392dd415c 100644 --- a/test/distributed_commands_on_lists_test.rb +++ b/test/distributed_commands_on_lists_test.rb @@ -7,13 +7,13 @@ class TestDistributedCommandsOnLists < Test::Unit::TestCase def test_rpoplpush assert_raise Redis::Distributed::CannotDistribute do - r.rpoplpush('foo', 'bar') + r.rpoplpush('key1', 'key4') end end def test_brpoplpush assert_raise Redis::Distributed::CannotDistribute do - r.brpoplpush('foo', 'bar', timeout: 1) + r.brpoplpush('key1', 'key4', timeout: 1) end end end diff --git a/test/distributed_commands_on_sets_test.rb b/test/distributed_commands_on_sets_test.rb index db9c66a87..ebd0cd7bc 100644 --- a/test/distributed_commands_on_sets_test.rb +++ b/test/distributed_commands_on_sets_test.rb @@ -7,74 +7,74 @@ class TestDistributedCommandsOnSets < Test::Unit::TestCase def test_smove assert_raise Redis::Distributed::CannotDistribute do - r.sadd 'foo', 's1' - r.sadd 'bar', 's2' + r.sadd 'key1', 's1' + r.sadd 'key4', 's2' - r.smove('foo', 'bar', 's1') + r.smove('key1', 'key4', 's1') end end def test_sinter assert_raise Redis::Distributed::CannotDistribute do - r.sadd 'foo', 's1' - r.sadd 'foo', 's2' - r.sadd 'bar', 's2' + r.sadd 'key1', 's1' + r.sadd 'key1', 's2' + r.sadd 'key4', 's2' - r.sinter('foo', 'bar') + r.sinter('key1', 'key4') end end def test_sinterstore assert_raise Redis::Distributed::CannotDistribute do - r.sadd 'foo', 's1' - r.sadd 'foo', 's2' - r.sadd 'bar', 's2' + r.sadd 'key1', 's1' + r.sadd 'key1', 's2' + r.sadd 'key4', 's2' - r.sinterstore('baz', 'foo', 'bar') + r.sinterstore('baz', 'key1', 'key4') end end def test_sunion assert_raise Redis::Distributed::CannotDistribute do - r.sadd 'foo', 's1' - r.sadd 'foo', 's2' - r.sadd 'bar', 's2' - r.sadd 'bar', 's3' + r.sadd 'key1', 's1' + r.sadd 'key1', 's2' + r.sadd 'key4', 's2' + r.sadd 'key4', 's3' - r.sunion('foo', 'bar') + r.sunion('key1', 'key4') end end def test_sunionstore assert_raise Redis::Distributed::CannotDistribute do - r.sadd 'foo', 's1' - r.sadd 'foo', 's2' - r.sadd 'bar', 's2' - r.sadd 'bar', 's3' + r.sadd 'key1', 's1' + r.sadd 'key1', 's2' + r.sadd 'key4', 's2' + r.sadd 'key4', 's3' - r.sunionstore('baz', 'foo', 'bar') + r.sunionstore('baz', 'key1', 'key4') end end def test_sdiff assert_raise Redis::Distributed::CannotDistribute do - r.sadd 'foo', 's1' - r.sadd 'foo', 's2' - r.sadd 'bar', 's2' - r.sadd 'bar', 's3' + r.sadd 'key1', 's1' + r.sadd 'key1', 's2' + r.sadd 'key4', 's2' + r.sadd 'key4', 's3' - r.sdiff('foo', 'bar') + r.sdiff('key1', 'key4') end end def test_sdiffstore assert_raise Redis::Distributed::CannotDistribute do - r.sadd 'foo', 's1' - r.sadd 'foo', 's2' - r.sadd 'bar', 's2' - r.sadd 'bar', 's3' + r.sadd 'key1', 's1' + r.sadd 'key1', 's2' + r.sadd 'key4', 's2' + r.sadd 'key4', 's3' - r.sdiffstore('baz', 'foo', 'bar') + r.sdiffstore('baz', 'key1', 'key4') end end diff --git a/test/distributed_commands_on_sorted_sets_test.rb b/test/distributed_commands_on_sorted_sets_test.rb index 3beb1c505..846f019f0 100644 --- a/test/distributed_commands_on_sorted_sets_test.rb +++ b/test/distributed_commands_on_sorted_sets_test.rb @@ -17,34 +17,6 @@ def test_zinterstore_with_weights assert_raise(Redis::Distributed::CannotDistribute) { super } end - def test_zlexcount - # Not implemented yet - end - - def test_zpopmax - # Not implemented yet - end - - def test_zpopmin - # Not implemented yet - end - - def test_zrangebylex - # Not implemented yet - end - - def test_zremrangebylex - # Not implemented yet - end - - def test_zrevrangebylex - # Not implemented yet - end - - def test_zscan - # Not implemented yet - end - def test_zunionstore assert_raise(Redis::Distributed::CannotDistribute) { super } end diff --git a/test/distributed_commands_on_streams_test.rb b/test/distributed_commands_on_streams_test.rb new file mode 100644 index 000000000..1fb9f5d83 --- /dev/null +++ b/test/distributed_commands_on_streams_test.rb @@ -0,0 +1,29 @@ +# frozen_string_literal: true + +require_relative 'helper' +require_relative 'lint/streams' + +class TestDistributedCommandsOnStreams < Test::Unit::TestCase + include Helper::Distributed + include Lint::Streams + + def test_xread_with_multiple_node_keys + redis.xadd('key1', { f: 'v01' }, id: '0-1') + redis.xadd('key1', { f: 'v02' }, id: '0-2') + redis.xadd('key4', { f: 'v11' }, id: '1-1') + redis.xadd('key4', { f: 'v12' }, id: '1-2') + + assert_raise(Redis::Distributed::CannotDistribute) { redis.xread(%w[key1 key4], %w[0-1 1-1]) } + end + + def test_xreadgroup_with_multiple_node_keys + redis.xadd('key1', { f: 'v01' }, id: '0-1') + redis.xgroup(:create, 'key1', 'g1', '$') + redis.xadd('key4', { f: 'v11' }, id: '1-1') + redis.xgroup(:create, 'key4', 'g1', '$') + redis.xadd('key1', { f: 'v02' }, id: '0-2') + redis.xadd('key4', { f: 'v12' }, id: '1-2') + + assert_raise(Redis::Distributed::CannotDistribute) { redis.xreadgroup('g1', 'c1', %w[key1 key4], %w[> >]) } + end +end diff --git a/test/distributed_commands_on_strings_test.rb b/test/distributed_commands_on_strings_test.rb index e50ccf444..8904a2ac4 100644 --- a/test/distributed_commands_on_strings_test.rb +++ b/test/distributed_commands_on_strings_test.rb @@ -32,37 +32,37 @@ def test_mget_mapped def test_mset assert_raise Redis::Distributed::CannotDistribute do - r.mset(:foo, "s1", :bar, "s2") + r.mset(:key1, "s1", :key4, "s2") end end def test_mset_mapped assert_raise Redis::Distributed::CannotDistribute do - r.mapped_mset(:foo => "s1", :bar => "s2") + r.mapped_mset(:key1 => "s1", :key4 => "s2") end end def test_msetnx assert_raise Redis::Distributed::CannotDistribute do - r.set("foo", "s1") - r.msetnx(:foo, "s2", :bar, "s3") + r.set("key1", "s1") + r.msetnx(:key1, "s2", :key4, "s3") end end def test_msetnx_mapped assert_raise Redis::Distributed::CannotDistribute do - r.set("foo", "s1") - r.mapped_msetnx(:foo => "s2", :bar => "s3") + r.set("key1", "s1") + r.mapped_msetnx(:key1 => "s2", :key4 => "s3") end end def test_bitop target_version "2.5.10" do assert_raise Redis::Distributed::CannotDistribute do - r.set("foo", "a") - r.set("bar", "b") + r.set("key1", "a") + r.set("key4", "b") - r.bitop(:and, "foo&bar", "foo", "bar") + r.bitop(:and, "key1&key4", "key1", "key4") end end end @@ -72,8 +72,4 @@ def test_mapped_mget_in_a_pipeline_returns_hash super end end - - def test_bitfield - # Not implemented yet - end end diff --git a/test/distributed_commands_on_value_types_test.rb b/test/distributed_commands_on_value_types_test.rb index e8f21534c..bacc293da 100644 --- a/test/distributed_commands_on_value_types_test.rb +++ b/test/distributed_commands_on_value_types_test.rb @@ -6,6 +6,14 @@ class TestDistributedCommandsOnValueTypes < Test::Unit::TestCase include Helper::Distributed include Lint::ValueTypes + def test_move + assert_raise(Redis::Distributed::CannotDistribute) { super } + + r.set('key1', 'v1') + assert r.move('key1', 14) + assert_equal nil, r.get('key1') + end + def test_del r.set "foo", "s1" r.set "bar", "s2" @@ -82,48 +90,48 @@ def test_randomkey def test_rename assert_raise Redis::Distributed::CannotDistribute do - r.set("foo", "s1") - r.rename "foo", "bar" + r.set("key1", "s1") + r.rename "key1", "key4" end - assert_equal "s1", r.get("foo") - assert_equal nil, r.get("bar") + assert_equal "s1", r.get("key1") + assert_equal nil, r.get("key4") end def test_renamenx assert_raise Redis::Distributed::CannotDistribute do - r.set("foo", "s1") - r.rename "foo", "bar" + r.set("key1", "s1") + r.rename "key1", "key4" end - assert_equal "s1", r.get("foo") - assert_equal nil , r.get("bar") + assert_equal "s1", r.get("key1") + assert_equal nil , r.get("key4") end def test_dbsize - assert_equal [0], r.dbsize + assert_equal [0, 0], r.dbsize - r.set("foo", "s1") + r.set("key1", "s1") - assert_equal [1], r.dbsize + assert_equal [1, 0], r.dbsize end def test_flushdb - r.set("foo", "s1") - r.set("bar", "s2") + r.set("key1", "s1") + r.set("key4", "s2") - assert_equal [2], r.dbsize + assert_equal [1, 1], r.dbsize r.flushdb - assert_equal [0], r.dbsize + assert_equal [0, 0], r.dbsize end def test_migrate r.set("foo", "s1") assert_raise Redis::Distributed::CannotDistribute do - r.migrate("foo", {}) + r.migrate('foo', host: '127.0.0.1', port: PORT) end end end diff --git a/test/distributed_connection_handling_test.rb b/test/distributed_connection_handling_test.rb index 335046abe..b0d5dc3ef 100644 --- a/test/distributed_connection_handling_test.rb +++ b/test/distributed_connection_handling_test.rb @@ -1,21 +1,13 @@ require_relative "helper" class TestDistributedConnectionHandling < Test::Unit::TestCase - include Helper::Distributed def test_ping - assert_equal ["PONG"], r.ping + assert_equal %w[PONG PONG], r.ping end def test_select - r.set "foo", "bar" - - r.select 14 - assert_equal nil, r.get("foo") - - r.select 15 - - assert_equal "bar", r.get("foo") + assert_raise(Redis::Distributed::CannotDistribute) { r.select 14 } end end diff --git a/test/distributed_internals_test.rb b/test/distributed_internals_test.rb index 82b9df3ea..289cfca23 100644 --- a/test/distributed_internals_test.rb +++ b/test/distributed_internals_test.rb @@ -1,68 +1,65 @@ require_relative "helper" class TestDistributedInternals < Test::Unit::TestCase - include Helper::Distributed def test_provides_a_meaningful_inspect - nodes = ["redis://127.0.0.1:#{PORT}/15", *NODES] - redis = Redis::Distributed.new nodes - - assert_equal "#", redis.inspect + expected = "#" + assert_equal expected, redis.inspect end def test_default_as_urls - nodes = ["redis://127.0.0.1:#{PORT}/15", *NODES] - redis = Redis::Distributed.new nodes - assert_equal ["redis://127.0.0.1:#{PORT}/15", *NODES], redis.nodes.map { |node| node._client.id } + expected = ["redis://127.0.0.1:#{PORT}/#{DB}", "redis://127.0.0.1:#{NODE2_PORT}/#{DB}"] + assert_equal expected, redis.nodes.map(&:id) end def test_default_as_config_hashes - nodes = [OPTIONS.merge(:host => '127.0.0.1'), OPTIONS.merge(:host => 'somehost', :port => PORT.next)] - redis = Redis::Distributed.new nodes - assert_equal ["redis://127.0.0.1:#{PORT}/15","redis://somehost:#{PORT.next}/15"], redis.nodes.map { |node| node._client.id } + nodes = [{ host: '127.0.0.1', port: PORT, db: DB }, { host: '127.0.0.1', port: NODE2_PORT, db: DB }] + redis = build_another_client(distributed: { nodes: nodes }) + expected = ["redis://127.0.0.1:#{PORT}/#{DB}", "redis://127.0.0.1:#{NODE2_PORT}/#{DB}"] + assert_equal expected, redis.nodes.map(&:id) end def test_as_mix_and_match - nodes = ["redis://127.0.0.1:7389/15", OPTIONS.merge(:host => 'somehost'), OPTIONS.merge(:host => 'somehost', :port => PORT.next)] - redis = Redis::Distributed.new nodes - assert_equal ["redis://127.0.0.1:7389/15", "redis://somehost:#{PORT}/15", "redis://somehost:#{PORT.next}/15"], redis.nodes.map { |node| node._client.id } + nodes = ["redis://127.0.0.1:#{PORT}/#{DB}", { host: '127.0.0.1', port: PORT, db: 14 }, { host: '127.0.0.1', port: NODE2_PORT, db: DB }] + redis = build_another_client(distributed: { nodes: nodes }) + expected = ["redis://127.0.0.1:#{PORT}/#{DB}", "redis://127.0.0.1:#{PORT}/14", "redis://127.0.0.1:#{NODE2_PORT}/#{DB}"] + assert_equal expected, redis.nodes.map(&:id) end def test_override_id - nodes = [OPTIONS.merge(:host => '127.0.0.1', :id => "test"), OPTIONS.merge( :host => 'somehost', :port => PORT.next, :id => "test1")] - redis = Redis::Distributed.new nodes - assert_equal redis.nodes.first._client.id, "test" - assert_equal redis.nodes.last._client.id, "test1" - assert_equal "#", redis.inspect + nodes = [{ host: '127.0.0.1', port: PORT, db: DB, id: 'test1' }, { host: '127.0.0.1', port: NODE2_PORT, db: DB, id: 'test2' }] + redis = build_another_client(distributed: { nodes: nodes }) + assert_equal redis.nodes.first.id, 'test1' + assert_equal redis.nodes.last.id, 'test2' + expected = "#" + assert_equal expected, redis.inspect end def test_can_be_duped_to_create_a_new_connection - redis = Redis::Distributed.new(NODES) - - clients = redis.info[0]["connected_clients"].to_i + clients = redis.info[0]['connected_clients'].to_i r2 = redis.dup r2.ping - assert_equal clients + 1, redis.info[0]["connected_clients"].to_i + assert_equal clients + 1, redis.info[0]['connected_clients'].to_i end def test_keeps_options_after_dup - r1 = Redis::Distributed.new(NODES, :tag => /^(\w+):/) + r1 = build_another_client(distributed: { nodes: NODES, tag: /^(\w+):/ }) assert_raise(Redis::Distributed::CannotDistribute) do - r1.sinter("foo", "bar") + r1.sinter('key1', 'key4') end - assert_equal [], r1.sinter("baz:foo", "baz:bar") + assert_equal [], r1.sinter('baz:foo', 'baz:bar') r2 = r1.dup assert_raise(Redis::Distributed::CannotDistribute) do - r2.sinter("foo", "bar") + r2.sinter('key1', 'key4') end - assert_equal [], r2.sinter("baz:foo", "baz:bar") + assert_equal [], r2.sinter('baz:foo', 'baz:bar') end end diff --git a/test/distributed_key_tags_test.rb b/test/distributed_key_tags_test.rb index a778fec72..d4275c140 100644 --- a/test/distributed_key_tags_test.rb +++ b/test/distributed_key_tags_test.rb @@ -1,21 +1,19 @@ require_relative "helper" class TestDistributedKeyTags < Test::Unit::TestCase - include Helper include Helper::Distributed def test_hashes_consistently - r1 = Redis::Distributed.new ["redis://127.0.0.1:#{PORT}/15", *NODES] - r2 = Redis::Distributed.new ["redis://127.0.0.1:#{PORT}/15", *NODES] - r3 = Redis::Distributed.new ["redis://127.0.0.1:#{PORT}/15", *NODES] + r1 = build_another_client + r2 = build_another_client + r3 = build_another_client - assert_equal r1.node_for("foo").id, r2.node_for("foo").id - assert_equal r1.node_for("foo").id, r3.node_for("foo").id + assert_equal r1.node_for('foo').id, r2.node_for('foo').id + assert_equal r1.node_for('foo').id, r3.node_for('foo').id end def test_allows_clustering_of_keys - r = Redis::Distributed.new(NODES) r.add_node("redis://127.0.0.1:#{PORT}/14") r.flushdb @@ -23,21 +21,21 @@ def test_allows_clustering_of_keys r.set "{foo}users:#{i}", i end - assert_equal [0, 100], r.nodes.map { |node| node.keys.size } + assert_equal([0, 0, 100], r.nodes.map { |node| node.call(%i[keys *]).size }) end def test_distributes_keys_if_no_clustering_is_used r.add_node("redis://127.0.0.1:#{PORT}/14") r.flushdb - r.set "users:1", 1 - r.set "users:4", 4 + r.set 'users:1', 1 + r.set 'users:4', 4 - assert_equal [1, 1], r.nodes.map { |node| node.keys.size } + assert_equal([1, 1, 0], r.nodes.map { |node| node.call(%i[keys *]).size }) end def test_allows_passing_a_custom_tag_extractor - r = Redis::Distributed.new(NODES, :tag => /^(.+?):/) + r = build_another_client(distributed: { nodes: NODES, tag: /^(.+?):/ }) r.add_node("redis://127.0.0.1:#{PORT}/14") r.flushdb @@ -45,6 +43,6 @@ def test_allows_passing_a_custom_tag_extractor r.set "foo:users:#{i}", i end - assert_equal [0, 100], r.nodes.map { |node| node.keys.size } + assert_equal([0, 0, 100], r.nodes.map { |node| node.call(%i[keys *]).size }) end end diff --git a/test/distributed_partitioner_test.rb b/test/distributed_partitioner_test.rb new file mode 100644 index 000000000..499b43fc6 --- /dev/null +++ b/test/distributed_partitioner_test.rb @@ -0,0 +1,165 @@ +# frozen_string_literal: true + +require_relative 'helper' + +class TestDistributedPartitioner < Test::Unit::TestCase + include Helper::Distributed + + def build_client + nodes = ["redis://127.0.0.1:#{PORT}/#{DB}", "redis://127.0.0.1:#{NODE2_PORT}/#{DB}"] + opts = { distributed: { nodes: nodes }, timeout: TIMEOUT } + Redis::Distributed::Partitioner.new(opts) + end + + def test_key_extraction_with_get_command + c = build_client + assert_equal %w[foo], c.send(:extract_keys, %w[get foo]) + assert_equal %w[{foo}bar], c.send(:extract_keys, %w[get {foo}bar]) + assert_equal %w[fo{oba}r], c.send(:extract_keys, %w[get fo{oba}r]) + end + + def test_key_extraction_with_set_command + c = build_client + assert_equal %w[foo], c.send(:extract_keys, %w[set foo 1]) + assert_equal %w[{foo}bar], c.send(:extract_keys, %w[set {foo}bar 1]) + assert_equal %w[fo{oba}r], c.send(:extract_keys, %w[set fo{oba}r 1]) + end + + def test_key_extraction_with_mget_command + c = build_client + assert_equal %w[foo bar], c.send(:extract_keys, %w[mget foo bar]) + assert_equal %w[{key}foo {key}bar], c.send(:extract_keys, %w[mget {key}foo {key}bar]) + assert_equal %w[{key1}foo {key2}bar], c.send(:extract_keys, %w[mget {key1}foo {key2}bar]) + assert_equal %w[ke{y1f}oo ke{y2b}ar], c.send(:extract_keys, %w[mget ke{y1f}oo ke{y2b}ar]) + end + + def test_key_extraction_with_mset_command + c = build_client + assert_equal %w[foo bar], c.send(:extract_keys, %w[mset foo 1 bar 2]) + assert_equal %w[{key}foo {key}bar], c.send(:extract_keys, %w[mset {key}foo 1 {key}bar 2]) + assert_equal %w[{key1}foo {key2}bar], c.send(:extract_keys, %w[mset {key1}foo 1 {key2}bar 2]) + assert_equal %w[ke{y1f}oo ke{y2b}ar], c.send(:extract_keys, %w[mset ke{y1f}oo 1 ke{y2b}ar 2]) + end + + def test_key_extraction_with_pubsub_command + c = build_client + assert_equal %w[chan], c.send(:extract_keys, [:publish, 'chan', 'Hi']) + assert_equal %w[chan], c.send(:extract_keys, [:subscribe, 'chan']) + assert_equal %w[chan1 chan2], c.send(:extract_keys, [:subscribe, 'chan1', 'chan2']) + assert_equal %w[{chan}1 {chan}2], c.send(:extract_keys, [:subscribe, '{chan}1', '{chan}2']) + + assert_equal [], c.send(:extract_keys, [:psubscribe, 'channel']) + assert_equal [], c.send(:extract_keys, [:pubsub, 'channels', '*']) + assert_equal [], c.send(:extract_keys, [:punsubscribe, 'channel']) + assert_equal [], c.send(:extract_keys, [:unsubscribe, 'channel']) + end + + def test_key_extraction_with_blocking_command + c = build_client + assert_equal %w[key1 key2], c.send(:extract_keys, [:blpop, 'key1', 'key2', 1]) + + target_version('3.2.0') do + # There is a bug Redis 3.0's COMMAND command + assert_equal %w[key1 key2], c.send(:extract_keys, [:brpop, 'key1', 'key2', 1]) + end + + assert_equal %w[key1 key2], c.send(:extract_keys, [:brpoplpush, 'key1', 'key2', 1]) + + target_version('5.0.0') do + assert_equal %w[key1 key2], c.send(:extract_keys, [:bzpopmin, 'key1', 'key2', 1]) + assert_equal %w[key1 key2], c.send(:extract_keys, [:bzpopmax, 'key1', 'key2', 1]) + end + end + + def test_key_extraction_with_keyless_command + c = build_client + assert_equal [], c.send(:extract_keys, [:auth, 'password']) + assert_equal [], c.send(:extract_keys, %i[client kill]) + assert_equal [], c.send(:extract_keys, %i[cluster addslots]) + assert_equal [], c.send(:extract_keys, %i[command]) + assert_equal [], c.send(:extract_keys, %i[command count]) + assert_equal [], c.send(:extract_keys, %i[config get]) + assert_equal [], c.send(:extract_keys, %i[debug segfault]) + assert_equal [], c.send(:extract_keys, [:echo, 'Hello World']) + assert_equal [], c.send(:extract_keys, [:flushall, 'ASYNC']) + assert_equal [], c.send(:extract_keys, [:flushdb, 'ASYNC']) + assert_equal [], c.send(:extract_keys, [:info, 'cluster']) + assert_equal [], c.send(:extract_keys, %i[memory doctor]) + assert_equal [], c.send(:extract_keys, [:ping, 'Hi']) + assert_equal [], c.send(:extract_keys, %w[script exists sha1 sha1]) + assert_equal [], c.send(:extract_keys, [:select, 1]) + assert_equal [], c.send(:extract_keys, [:shutdown, 'SAVE']) + assert_equal [], c.send(:extract_keys, [:slaveof, '127.0.0.1', 6379]) + assert_equal [], c.send(:extract_keys, [:slowlog, 'get', 2]) + assert_equal [], c.send(:extract_keys, [:swapdb, 0, 1]) + assert_equal [], c.send(:extract_keys, [:wait, 1, 0]) + end + + def test_key_extraction_with_command_having_various_positional_key + c = build_client + assert_equal %w[key1 key2], c.send(:extract_keys, [:eval, 'script', 2, 'key1', 'key2', 'first', 'second']) + assert_equal [], c.send(:extract_keys, [:eval, 'return 0', 0]) + assert_equal %w[key1 key2], c.send(:extract_keys, [:evalsha, 'sha1', 2, 'key1', 'key2', 'first', 'second']) + assert_equal [], c.send(:extract_keys, [:evalsha, 'return 0', 0]) + assert_equal [], c.send(:extract_keys, [:migrate, '127.0.0.1', 6379, 'key1', 0, 5000]) + assert_equal %w[key1], c.send(:extract_keys, [:object, 'refcount', 'key1']) + assert_equal %w[dest key1 key2], c.send(:extract_keys, [:zinterstore, 'dest', 2, 'key1', 'key2', 'WEIGHTS', 2, 3]) + assert_equal %w[dest key1 key2], c.send(:extract_keys, [:zinterstore, 'dest', 2, 'key1', 'key2', 'AGGREGATE', 'sum']) + assert_equal %w[dest key1 key2], c.send(:extract_keys, [:zinterstore, 'dest', 2, 'key1', 'key2', 'WEIGHTS', 2, 3, 'AGGREGATE', 'sum']) + assert_equal %w[dest key1 key2], c.send(:extract_keys, [:zinterstore, 'dest', 2, 'key1', 'key2']) + assert_equal %w[dest key1 key2], c.send(:extract_keys, [:zunionstore, 'dest', 2, 'key1', 'key2', 'WEIGHTS', 2, 3]) + assert_equal %w[dest key1 key2], c.send(:extract_keys, [:zunionstore, 'dest', 2, 'key1', 'key2', 'AGGREGATE', 'sum']) + assert_equal %w[dest key1 key2], c.send(:extract_keys, [:zunionstore, 'dest', 2, 'key1', 'key2', 'WEIGHTS', 2, 3, 'AGGREGATE', 'sum']) + assert_equal %w[dest key1 key2], c.send(:extract_keys, [:zunionstore, 'dest', 2, 'key1', 'key2']) + + assert_equal %w[key1 key2 key3 key4 key5], c.send(:extract_keys, [:sort, 'key1', 'BY', 'key2', 'LIMIT', 0, 5, 'GET', 'key3', 'GET', 'key4', 'DESC', 'ALPHA', 'STORE', 'key5']).sort + + target_version('4.0.0') do + assert_equal %w[key1], c.send(:extract_keys, [:memory, :usage, 'key1']) + end + + target_version('5.0.0') do + assert_equal %w[s1 s2], c.send(:extract_keys, [:xread, 'COUNT', 2, 'STREAMS', 's1', 's2', 0, 0]) + assert_equal %w[s1], c.send(:extract_keys, [:xread, 'COUNT', 2, 'STREAMS', 's1', 0]) + assert_equal %w[s1 s2], c.send(:extract_keys, [:xreadgroup, 'GROUP', 'mygroup', 'Bob', 'COUNT', 2, 'STREAMS', 's1', 's2', '>', '>']) + assert_equal %w[s1], c.send(:extract_keys, [:xreadgroup, 'GROUP', 'mygroup', 'Bob', 'COUNT', 2, 'STREAMS', 's1', '>']) + end + end + + def test_multiple_node_handling_for_mget_command + c = build_client + range = (1..100) + range.each { |i| c.call(['set', "key#{i}", i]) } + keys = range.map { |i| "key#{i}" } + expected = range.map(&:to_s) + actual = c.send(:send_mget_command, ['mget'] + keys) + assert_equal expected, actual + end + + def test_multiple_node_handling_for_mget_command_with_tag + c = build_client + range = (1..100) + range.each { |i| c.call(['set', "{key}#{i}", i]) } + keys = range.map { |i| "{key}#{i}" } + expected = range.map(&:to_s) + actual = c.send(:send_mget_command, ['mget'] + keys) + assert_equal expected, actual + end + + def test_send_command + c = build_client + assert_raise(Redis::Distributed::CannotDistribute) { c.send(:send_command, ['eval', 'return {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}', '2', 'key1', 'key4', 'first', 'second']) } + end + + def test_assert_same_node! + c = build_client + assert_raise(Redis::Distributed::CannotDistribute) { c.send(:assert_same_node!, %w[key1 key4]) } + assert_equal nil, c.send(:assert_same_node!, %w[key1 key2]) + assert_equal nil, c.send(:assert_same_node!, %w[key3 key4]) + assert_raise(Redis::Distributed::CannotDistribute) { c.send(:assert_same_node!, %w[a b]) } + assert_equal nil, c.send(:assert_same_node!, %w[b c]) + assert_equal nil, c.send(:assert_same_node!, %w[c d]) + assert_equal nil, c.send(:assert_same_node!, %w[foo bar baz zap]) + assert_equal nil, c.send(:assert_same_node!, %w[hoge fuga]) + end +end diff --git a/test/distributed_publish_subscribe_test.rb b/test/distributed_publish_subscribe_test.rb index a5fcfa53f..429a71326 100644 --- a/test/distributed_publish_subscribe_test.rb +++ b/test/distributed_publish_subscribe_test.rb @@ -6,11 +6,7 @@ class TestDistributedPublishSubscribe < Test::Unit::TestCase def test_subscribe_and_unsubscribe assert_raise Redis::Distributed::CannotDistribute do - r.subscribe("foo", "bar") { } - end - - assert_raise Redis::Distributed::CannotDistribute do - r.subscribe("{qux}foo", "bar") { } + r.subscribe("key1", "key4") { } end end @@ -42,7 +38,7 @@ def test_subscribe_and_unsubscribe_with_tags # Wait until the subscription is active before publishing Wire.pass while !@subscribed - Redis::Distributed.new(NODES).publish("foo", "s1") + build_another_client.publish('foo', 's1') wire.join diff --git a/test/distributed_remote_server_control_commands_test.rb b/test/distributed_remote_server_control_commands_test.rb index 66ec349f8..40fae2ad9 100644 --- a/test/distributed_remote_server_control_commands_test.rb +++ b/test/distributed_remote_server_control_commands_test.rb @@ -27,7 +27,7 @@ def test_info def test_info_commandstats target_version "2.5.7" do - r.nodes.each { |n| n.config(:resetstat) } + r.nodes.each { |n| n.call(%i[config resetstat]) } r.ping # Executed on every node r.info(:commandstats).each do |info| @@ -37,12 +37,7 @@ def test_info_commandstats end def test_monitor - begin - r.monitor - rescue Exception => ex - ensure - assert ex.kind_of?(NotImplementedError) - end + assert_raise(Redis::Distributed::CannotDistribute) { r.monitor { :dummy } } end def test_echo diff --git a/test/distributed_scripting_test.rb b/test/distributed_scripting_test.rb index 224ab2b05..33b6f0906 100644 --- a/test/distributed_scripting_test.rb +++ b/test/distributed_scripting_test.rb @@ -1,7 +1,6 @@ require_relative "helper" class TestDistributedScripting < Test::Unit::TestCase - include Helper::Distributed def to_sha(script) @@ -9,92 +8,84 @@ def to_sha(script) end def test_script_exists - target_version "2.5.9" do # 2.6-rc1 - a = to_sha("return 1") + target_version '2.5.9' do # 2.6-rc1 + a = to_sha('return 1') b = a.succ - assert_equal [true], r.script(:exists, a) - assert_equal [false], r.script(:exists, b) - assert_equal [[true]], r.script(:exists, [a]) - assert_equal [[false]], r.script(:exists, [b]) - assert_equal [[true, false]], r.script(:exists, [a, b]) + assert_equal true, r.script(:exists, a) + assert_equal false, r.script(:exists, b) + assert_equal [true], r.script(:exists, [a]) + assert_equal [false], r.script(:exists, [b]) + assert_equal [true, false], r.script(:exists, [a, b]) end end def test_script_flush - target_version "2.5.9" do # 2.6-rc1 - sha = to_sha("return 1") - assert r.script(:exists, sha).first - assert_equal ["OK"], r.script(:flush) - assert !r.script(:exists, sha).first + target_version '2.5.9' do # 2.6-rc1 + sha = to_sha('return 1') + assert r.script(:exists, sha) + assert_equal %w[OK OK], r.script(:flush) + assert !r.script(:exists, sha) end end def test_script_kill - target_version "2.5.9" do # 2.6-rc1 - redis_mock(:script => lambda { |arg| "+#{arg.upcase}" }) do |redis| - assert_equal ["KILL"], redis.script(:kill) + target_version '2.5.9' do # 2.6-rc1 + redis_mock(script: ->(arg) { "+#{arg.upcase}" }) do |redis| + assert_equal ['KILL'], redis.script(:kill) end end end def test_eval - target_version "2.5.9" do # 2.6-rc1 - assert_raises(Redis::Distributed::CannotDistribute) do - r.eval("return #KEYS") - end + target_version '2.5.9' do # 2.6-rc1 + assert_equal 1, r.eval('return 1') assert_raises(Redis::Distributed::CannotDistribute) do - r.eval("return KEYS", ["k1", "k2"]) + r.eval('return KEYS', %w[key1 key4]) end - assert_equal ["k1"], r.eval("return KEYS", ["k1"]) - assert_equal ["a1", "a2"], r.eval("return ARGV", ["k1"], ["a1", "a2"]) + assert_equal ['k1'], r.eval('return KEYS', ['k1']) + assert_equal %w[a1 a2], r.eval('return ARGV', ['k1'], %w[a1 a2]) end end def test_eval_with_options_hash - target_version "2.5.9" do # 2.6-rc1 - assert_raises(Redis::Distributed::CannotDistribute) do - r.eval("return #KEYS", {}) - end + target_version '2.5.9' do # 2.6-rc1 + assert_equal 1, r.eval('return 1', {}) assert_raises(Redis::Distributed::CannotDistribute) do - r.eval("return KEYS", { :keys => ["k1", "k2"] }) + r.eval('return KEYS', keys: %w[key1 key4]) end - assert_equal ["k1"], r.eval("return KEYS", { :keys => ["k1"] }) - assert_equal ["a1", "a2"], r.eval("return ARGV", { :keys => ["k1"], :argv => ["a1", "a2"] }) + assert_equal ['k1'], r.eval('return KEYS', keys: ['k1']) + assert_equal %w[a1 a2], r.eval('return ARGV', keys: ['k1'], argv: %w[a1 a2]) end end def test_evalsha - target_version "2.5.9" do # 2.6-rc1 - assert_raises(Redis::Distributed::CannotDistribute) do - r.evalsha(to_sha("return #KEYS")) - end + target_version '2.5.9' do # 2.6-rc1 + assert_equal 1, r.evalsha(to_sha('return 1')) assert_raises(Redis::Distributed::CannotDistribute) do - r.evalsha(to_sha("return KEYS"), ["k1", "k2"]) + r.evalsha(to_sha('return KEYS'), %w[key1 key4]) end - assert_equal ["k1"], r.evalsha(to_sha("return KEYS"), ["k1"]) - assert_equal ["a1", "a2"], r.evalsha(to_sha("return ARGV"), ["k1"], ["a1", "a2"]) + assert_equal ['k1'], r.evalsha(to_sha('return KEYS'), ['k1']) + assert_equal %w[a1 a2], r.evalsha(to_sha('return ARGV'), ['k1'], %w[a1 a2]) end end def test_evalsha_with_options_hash - target_version "2.5.9" do # 2.6-rc1 - assert_raises(Redis::Distributed::CannotDistribute) do - r.evalsha(to_sha("return #KEYS"), {}) - end + target_version '2.5.9' do # 2.6-rc1 + assert_equal 1, r.evalsha(to_sha('return 1'), {}) assert_raises(Redis::Distributed::CannotDistribute) do - r.evalsha(to_sha("return KEYS"), { :keys => ["k1", "k2"] }) + r.evalsha(to_sha('return KEYS'), keys: %w[key1 key4]) end - assert_equal ["k1"], r.evalsha(to_sha("return KEYS"), { :keys => ["k1"] }) - assert_equal ["a1", "a2"], r.evalsha(to_sha("return ARGV"), { :keys => ["k1"], :argv => ["a1", "a2"] }) + assert_equal ['k1'], r.evalsha(to_sha('return KEYS'), keys: ['k1']) + assert_equal %w[a1 a2], r.evalsha(to_sha('return ARGV'), keys: ['k1'], argv: %w[a1 a2]) end end end diff --git a/test/distributed_sorting_test.rb b/test/distributed_sorting_test.rb index 7afb670b9..f2a6cb468 100644 --- a/test/distributed_sorting_test.rb +++ b/test/distributed_sorting_test.rb @@ -6,13 +6,13 @@ class TestDistributedSorting < Test::Unit::TestCase def test_sort assert_raise(Redis::Distributed::CannotDistribute) do - r.set("foo:1", "s1") - r.set("foo:2", "s2") + r.set("key1:1", "s1") + r.set("key1:2", "s2") - r.rpush("bar", "1") - r.rpush("bar", "2") + r.rpush("key4", "1") + r.rpush("key4", "2") - r.sort("bar", :get => "foo:*", :limit => [0, 1]) + r.sort("key4", :get => "key1:*", :limit => [0, 1]) end end end diff --git a/test/distributed_test.rb b/test/distributed_test.rb index b19e5329b..b1afce7b3 100644 --- a/test/distributed_test.rb +++ b/test/distributed_test.rb @@ -1,56 +1,63 @@ require_relative "helper" class TestDistributed < Test::Unit::TestCase - include Helper::Distributed def test_handle_multiple_servers - @r = Redis::Distributed.new ["redis://127.0.0.1:#{PORT}/15", *NODES] - 100.times do |idx| - @r.set(idx.to_s, "foo#{idx}") + r.set(idx.to_s, "foo#{idx}") end 100.times do |idx| - assert_equal "foo#{idx}", @r.get(idx.to_s) + assert_equal "foo#{idx}", r.get(idx.to_s) end - assert_equal "0", @r.keys("*").sort.first - assert_equal "string", @r.type("1") + assert_equal '0', r.keys('*').min + assert_equal 'string', r.type('1') end def test_add_nodes - logger = Logger.new("/dev/null") + logger = Logger.new('/dev/null') - @r = Redis::Distributed.new NODES, :logger => logger, :timeout => 10 + r = build_another_client(distributed: { nodes: NODES }, logger: logger, timeout: 10) - assert_equal "127.0.0.1", @r.nodes[0]._client.host - assert_equal PORT, @r.nodes[0]._client.port - assert_equal 15, @r.nodes[0]._client.db - assert_equal 10, @r.nodes[0]._client.timeout - assert_equal logger, @r.nodes[0]._client.logger + assert_equal '127.0.0.1', r.nodes[0].host + assert_equal PORT, r.nodes[0].port + assert_equal DB, r.nodes[0].db + assert_equal 10, r.nodes[0].timeout + assert_equal logger, r.nodes[0].logger - @r.add_node("redis://127.0.0.1:6380/14") + r.add_node("redis://127.0.0.1:#{PORT}/14") - assert_equal "127.0.0.1", @r.nodes[1]._client.host - assert_equal 6380, @r.nodes[1]._client.port - assert_equal 14, @r.nodes[1]._client.db - assert_equal 10, @r.nodes[1]._client.timeout - assert_equal logger, @r.nodes[1]._client.logger + assert_equal '127.0.0.1', r.nodes[1].host + assert_equal NODE2_PORT, r.nodes[1].port + assert_equal DB, r.nodes[1].db + assert_equal 10, r.nodes[1].timeout + assert_equal logger, r.nodes[1].logger end def test_pipelining_commands_cannot_be_distributed assert_raise Redis::Distributed::CannotDistribute do r.pipelined do - r.lpush "foo", "s1" - r.lpush "foo", "s2" + r.lpush 'foo', 's1' + r.lpush 'foo', 's2' end end end def test_unknown_commands_does_not_work_by_default - assert_raise NoMethodError do + assert_raise Redis::CommandError do r.not_yet_implemented_command end end + + def test_backward_compatible_class + rd = Redis::Distributed.new(NODES, timeout: TIMEOUT) + rd.set('foo', '1') + assert_equal '1', rd.get('foo') + + hr = Redis::HashRing.new + hr.add_node(r) + assert_equal r, hr.get_node('foo') + end end diff --git a/test/distributed_transactions_test.rb b/test/distributed_transactions_test.rb index 526e96d45..f091e3a1f 100644 --- a/test/distributed_transactions_test.rb +++ b/test/distributed_transactions_test.rb @@ -1,18 +1,13 @@ require_relative "helper" class TestDistributedTransactions < Test::Unit::TestCase - include Helper::Distributed def test_multi_discard - @foo = nil - assert_raise Redis::Distributed::CannotDistribute do - r.multi { @foo = 1 } + r.multi { :dummy } end - assert_equal nil, @foo - assert_raise Redis::Distributed::CannotDistribute do r.discard end diff --git a/test/helper.rb b/test/helper.rb index 04baf9813..518516918 100644 --- a/test/helper.rb +++ b/test/helper.rb @@ -17,7 +17,7 @@ PORT = 6381 DB = 15 -TIMEOUT = Float(ENV['TIMEOUT'] || 0.1) +TIMEOUT = Float(ENV['TIMEOUT'] || 1.0) LOW_TIMEOUT = Float(ENV['LOW_TIMEOUT'] || 0.01) # for blocking-command tests OPTIONS = { port: PORT, db: DB, timeout: TIMEOUT }.freeze @@ -186,23 +186,56 @@ def _new_client(options = {}) module Distributed include Generic - NODES = ["redis://127.0.0.1:#{PORT}/#{DB}"].freeze + NODE2_PORT = 6383 + NODES = [PORT, NODE2_PORT].map { |port| "redis://127.0.0.1:#{port}/#{DB}" }.freeze + + def redis_mock(commands, options = {}) + commands[:command] = ->(*_) { "*0\r\n" } + + RedisMock.start(commands, options) do |port| + yield _new_client(options.merge(distributed: { nodes: ["redis://127.0.0.1:#{port}"] })) + end + end def version - Version.new(redis.info.first["redis_version"]) + Version.new(redis.info.first['redis_version']) + end + + def build_another_client(options = {}) + _new_client(options) + end + + def init(redis) + redis.flushall + redis + rescue Redis::CannotConnectError + puts <<-MSG + + Cannot connect to Redis instances. + + Make sure Redis is running on localhost, port #{PORT}, #{NODE2_PORT}. + + Try this once: + + $ make stop_all + + Then run the build again: + + $ make start_all + + MSG + exit 1 end private def _format_options(options) - { - :timeout => OPTIONS[:timeout], - :logger => ::Logger.new(@log), - }.merge(options) + { distributed: { nodes: NODES }, + timeout: TIMEOUT, logger: ::Logger.new(@log) }.merge(options) end def _new_client(options = {}) - Redis::Distributed.new(NODES, _format_options(options).merge(:driver => ENV["conn"])) + Redis.new(_format_options(options).merge(driver: ENV['DRIVER'])) end end