From bc4be3f63a5caa745131aece61174e65867cd407 Mon Sep 17 00:00:00 2001 From: Mayank Asthana Date: Fri, 4 Sep 2020 02:42:26 +0530 Subject: [PATCH 1/8] Remove `filterKeysByType` step when getting data from redis Instead of filtering keys by type beforehand, assume all keys are of the expected type and handle exceptions later if we encounter a wrong type. --- .../provider/redis/rdd/RedisRDD.scala | 74 ++++++++++++++----- 1 file changed, 57 insertions(+), 17 deletions(-) diff --git a/src/main/scala/com/redislabs/provider/redis/rdd/RedisRDD.scala b/src/main/scala/com/redislabs/provider/redis/rdd/RedisRDD.scala index 142a1c03..1eb8a3b7 100644 --- a/src/main/scala/com/redislabs/provider/redis/rdd/RedisRDD.scala +++ b/src/main/scala/com/redislabs/provider/redis/rdd/RedisRDD.scala @@ -7,11 +7,13 @@ import com.redislabs.provider.redis.util.PipelineUtils.mapWithPipeline import com.redislabs.provider.redis.{ReadWriteConfig, RedisConfig, RedisNode} import org.apache.spark._ import org.apache.spark.rdd.RDD +import redis.clients.jedis.exceptions.JedisDataException import redis.clients.jedis.{Jedis, ScanParams} import redis.clients.jedis.util.JedisClusterCRC16 import scala.collection.JavaConversions._ import scala.reflect.{ClassTag, classTag} +import scala.util.{Failure, Success, Try} class RedisKVRDD(prev: RDD[String], @@ -38,11 +40,22 @@ class RedisKVRDD(prev: RDD[String], def getKV(nodes: Array[RedisNode], keys: Iterator[String]): Iterator[(String, String)] = { groupKeysByNode(nodes, keys).flatMap { case (node, nodeKeys) => val conn = node.endpoint.connect() - val stringKeys = filterKeysByType(conn, nodeKeys, "string") - val response = mapWithPipeline(conn, stringKeys) { (pipeline, key) => + + val response = mapWithPipeline(conn, nodeKeys) { (pipeline, key) => pipeline.get(key) } - val res = stringKeys.zip(response).iterator.asInstanceOf[Iterator[(String, String)]] + + val res = nodeKeys.zip(response) + .flatMap{ + // Silently filter out this exception, when the value doesn't match the expected type "String" + case (_, e: JedisDataException) if Option(e.getMessage).getOrElse("").contains("WRONGTYPE") => None + // Throw any other JedisDataException we encounter + case (_, e: JedisDataException) => throw e + case (k, v: String) => Some(k, v) + // Default case, that shouldn't really happen + case _ => None + } + .iterator conn.close() res } @@ -51,8 +64,13 @@ class RedisKVRDD(prev: RDD[String], def getHASH(nodes: Array[RedisNode], keys: Iterator[String]): Iterator[(String, String)] = { groupKeysByNode(nodes, keys).flatMap { case (node, nodeKeys) => val conn = node.endpoint.connect() - val hashKeys = filterKeysByType(conn, nodeKeys, "hash") - val res = hashKeys.flatMap(conn.hgetAll).iterator + val res = nodeKeys.flatMap{k => + Try(conn.hgetAll(k).toMap) match { + case Failure(e: JedisDataException) if Option(e.getMessage).getOrElse("").contains("WRONGTYPE") => None + case Failure(e) => throw e + case Success(value) => Some(value) + } + }.flatten.iterator conn.close() res } @@ -80,8 +98,14 @@ class RedisListRDD(prev: RDD[String], def getSET(nodes: Array[RedisNode], keys: Iterator[String]): Iterator[String] = { groupKeysByNode(nodes, keys).flatMap { case (node, nodeKeys) => val conn = node.endpoint.connect() - val setKeys = filterKeysByType(conn, nodeKeys, "set") - val res = setKeys.flatMap(conn.smembers).iterator + val res: Iterator[String] = nodeKeys.flatMap{k => + Try(conn.smembers(k).toSet) match { + case Failure(e: JedisDataException) if Option(e.getMessage).getOrElse("").contains("WRONGTYPE") => None + case Failure(e) => throw e + case Success(value) => Some(value) + } + }.flatten + .iterator conn.close() res } @@ -90,8 +114,13 @@ class RedisListRDD(prev: RDD[String], def getLIST(nodes: Array[RedisNode], keys: Iterator[String]): Iterator[String] = { groupKeysByNode(nodes, keys).flatMap { case (node, nodeKeys) => val conn = node.endpoint.connect() - val listKeys = filterKeysByType(conn, nodeKeys, "list") - val res = listKeys.flatMap(conn.lrange(_, 0, -1)).iterator + val res = nodeKeys.flatMap{ k => + Try(conn.lrange(k, 0, -1)) match { + case Failure(e: JedisDataException) if Option(e.getMessage).getOrElse("").contains("WRONGTYPE") => None + case Failure(e) => throw e + case Success(value) =>Some(value) + } + }.flatten.iterator conn.close() res } @@ -133,13 +162,24 @@ class RedisZSetRDD[T: ClassTag](prev: RDD[String], endPos: Long): Iterator[T] = { groupKeysByNode(nodes, keys).flatMap { case (node, nodeKeys) => val conn = node.endpoint.connect() - val zsetKeys = filterKeysByType(conn, nodeKeys, "zset") val res = { if (classTag[T] == classTag[(String, Double)]) { - zsetKeys.flatMap(k => conn.zrangeWithScores(k, startPos, endPos)). - map(tup => (tup.getElement, tup.getScore)).iterator + nodeKeys.flatMap{k => + Try(conn.zrangeWithScores(k, startPos, endPos)) match { + case Failure(e: JedisDataException) if Option(e.getMessage).getOrElse("").contains("WRONGTYPE") => None + case Failure(e) => throw e + case Success(value) => Some(value) + } + }.flatten + .map(tup => (tup.getElement, tup.getScore)).iterator } else if (classTag[T] == classTag[String]) { - zsetKeys.flatMap(k => conn.zrange(k, startPos, endPos)).iterator + nodeKeys.flatMap{k => + Try(conn.zrange(k, startPos, endPos)) match { + case Failure(e: JedisDataException) if Option(e.getMessage).getOrElse("").contains("WRONGTYPE") => None + case Failure(e) => throw e + case Success(value) => Some(value) + } + }.flatten.iterator } else { throw new scala.Exception("Unknown RedisZSetRDD type") } @@ -155,13 +195,13 @@ class RedisZSetRDD[T: ClassTag](prev: RDD[String], endScore: Double): Iterator[T] = { groupKeysByNode(nodes, keys).flatMap { case (node, nodeKeys) => val conn = node.endpoint.connect() - val zsetKeys = filterKeysByType(conn, nodeKeys, "zset") val res = { if (classTag[T] == classTag[(String, Double)]) { - zsetKeys.flatMap(k => conn.zrangeByScoreWithScores(k, startScore, endScore)). - map(tup => (tup.getElement, tup.getScore)).iterator + nodeKeys.flatMap(k => Try(conn.zrangeByScoreWithScores(k, startScore, endScore)).toOption). + flatten + .map(tup => (tup.getElement, tup.getScore)).iterator } else if (classTag[T] == classTag[String]) { - zsetKeys.flatMap(k => conn.zrangeByScore(k, startScore, endScore)).iterator + nodeKeys.flatMap(k => Try(conn.zrangeByScore(k, startScore, endScore)).toOption).flatten.iterator } else { throw new scala.Exception("Unknown RedisZSetRDD type") } From d264b9cf7ff9f5600fe3d2da74b52c4af63ec6f8 Mon Sep 17 00:00:00 2001 From: Mayank Asthana Date: Fri, 4 Sep 2020 04:06:41 +0530 Subject: [PATCH 2/8] Add tests for wrong key types for redisRDD functions --- .../redislabs/provider/redis/rdd/RedisRddSuite.scala | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/test/scala/com/redislabs/provider/redis/rdd/RedisRddSuite.scala b/src/test/scala/com/redislabs/provider/redis/rdd/RedisRddSuite.scala index 807ef6a1..7f41ee68 100644 --- a/src/test/scala/com/redislabs/provider/redis/rdd/RedisRddSuite.scala +++ b/src/test/scala/com/redislabs/provider/redis/rdd/RedisRddSuite.scala @@ -52,9 +52,11 @@ trait RedisRddSuite extends SparkRedisSuite with Keys with Matchers { test("RedisKVRDD") { val redisKVRDD = sc.fromRedisKV("*") val kvContents = redisKVRDD.sortByKey().collect + val wrongTypeKeysRes = List(hashKey, zSetKey, listKey, setKey).map(sc.fromRedisKV(_).collect) val wcnts = contentWords.map((_, 1)).groupBy(_._1). map(x => (x._1, x._2.map(_._2).sum.toString)).toArray.sortBy(_._1) kvContents shouldBe wcnts + all(wrongTypeKeysRes) should have size 0 } test("RedisZsetRDD") { @@ -77,6 +79,7 @@ trait RedisRddSuite extends SparkRedisSuite with Keys with Matchers { val redisZRangeByScore = sc.fromRedisZRangeByScore(zSetKey, 3, 9) val zrangeByScore = redisZRangeByScore.collect.sorted + val wrongTypeKeysRes = List(hashKey, setKey, listKey, contentWords(0)).map(sc.fromRedisZSetWithScore(_).collect) val wcnts = contentWords.map((_, 1)).groupBy(_._1). map(x => (x._1, x._2.map(_._2).sum.toDouble)) @@ -87,6 +90,7 @@ trait RedisRddSuite extends SparkRedisSuite with Keys with Matchers { zrangeByScoreWithScore should be(wcnts.toArray.filter(x => x._2 >= 3 && x._2 <= 9) .sortBy(x => (x._2, x._1))) zrangeByScore should be(wcnts.toArray.filter(x => x._2 >= 3 && x._2 <= 9).map(_._1).sorted) + all(wrongTypeKeysRes) should have length 0 } test("RedisHashRDD") { @@ -94,21 +98,27 @@ trait RedisRddSuite extends SparkRedisSuite with Keys with Matchers { val hashContents = redisHashRDD.sortByKey().collect val wcnts = contentWords.map((_, 1)).groupBy(_._1). map(x => (x._1, x._2.map(_._2).sum.toString)).toArray.sortBy(_._1) + val wrongTypeKeysRes = List(zSetKey, setKey, listKey, contentWords(0)).map(sc.fromRedisHash(_).collect) hashContents should be(wcnts) + all(wrongTypeKeysRes) should have length 0 } test("RedisListRDD") { val redisListRDD = sc.fromRedisList(listKey) val listContents = redisListRDD.sortBy(x => x).collect val ws = contentWords.sorted + val wrongTypeKeysRes = List(zSetKey, setKey, hashKey, contentWords(0)).map(sc.fromRedisList(_).collect) listContents should be(ws) + all(wrongTypeKeysRes) should have length 0 } test("RedisSetRDD") { val redisSetRDD = sc.fromRedisSet(setKey) val setContents = redisSetRDD.sortBy(x => x).collect val ws = content.split("\\W+").filter(!_.isEmpty).distinct.sorted + val wrongTypeKeysRes = List(zSetKey, listKey, hashKey, contentWords(0)).map(sc.fromRedisSet(_).collect) setContents should be(ws) + all(wrongTypeKeysRes) should have length 0 } test("Expire") { From 3ebdcff030ef56b3841eab656adeb718fab8a120 Mon Sep 17 00:00:00 2001 From: Mayank Asthana Date: Fri, 4 Sep 2020 04:10:21 +0530 Subject: [PATCH 3/8] Missed changes to getZSetByScore --- .../redislabs/provider/redis/rdd/RedisRDD.scala | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/src/main/scala/com/redislabs/provider/redis/rdd/RedisRDD.scala b/src/main/scala/com/redislabs/provider/redis/rdd/RedisRDD.scala index 1eb8a3b7..c5a92eb2 100644 --- a/src/main/scala/com/redislabs/provider/redis/rdd/RedisRDD.scala +++ b/src/main/scala/com/redislabs/provider/redis/rdd/RedisRDD.scala @@ -197,11 +197,23 @@ class RedisZSetRDD[T: ClassTag](prev: RDD[String], val conn = node.endpoint.connect() val res = { if (classTag[T] == classTag[(String, Double)]) { - nodeKeys.flatMap(k => Try(conn.zrangeByScoreWithScores(k, startScore, endScore)).toOption). + nodeKeys.flatMap{k => + Try(conn.zrangeByScoreWithScores(k, startScore, endScore)) match { + case Failure(e: JedisDataException) if Option(e.getMessage).getOrElse("").contains("WRONGTYPE") => None + case Failure(e) => throw e + case Success(value) => Some(value) + } + }. flatten .map(tup => (tup.getElement, tup.getScore)).iterator } else if (classTag[T] == classTag[String]) { - nodeKeys.flatMap(k => Try(conn.zrangeByScore(k, startScore, endScore)).toOption).flatten.iterator + nodeKeys.flatMap{ k => + Try(conn.zrangeByScore(k, startScore, endScore)) match { + case Failure(e: JedisDataException) if Option(e.getMessage).getOrElse("").contains("WRONGTYPE") => None + case Failure(e) => throw e + case Success(value) => Some(value) + } + }.flatten.iterator } else { throw new scala.Exception("Unknown RedisZSetRDD type") } From e978dcca7d3151141c52f962e0a38534b320bdaf Mon Sep 17 00:00:00 2001 From: Mayank Asthana Date: Tue, 8 Sep 2020 01:58:30 +0530 Subject: [PATCH 4/8] Reduce exception-ignoring code duplication --- .../provider/redis/rdd/RedisRDD.scala | 43 ++++--------------- .../provider/redis/util/ParseUtils.scala | 16 +++++-- 2 files changed, 20 insertions(+), 39 deletions(-) diff --git a/src/main/scala/com/redislabs/provider/redis/rdd/RedisRDD.scala b/src/main/scala/com/redislabs/provider/redis/rdd/RedisRDD.scala index c5a92eb2..836ac167 100644 --- a/src/main/scala/com/redislabs/provider/redis/rdd/RedisRDD.scala +++ b/src/main/scala/com/redislabs/provider/redis/rdd/RedisRDD.scala @@ -3,6 +3,7 @@ package com.redislabs.provider.redis.rdd import java.util import com.redislabs.provider.redis.partitioner._ +import com.redislabs.provider.redis.util.ParseUtils.ignoreJedisWrongTypeException import com.redislabs.provider.redis.util.PipelineUtils.mapWithPipeline import com.redislabs.provider.redis.{ReadWriteConfig, RedisConfig, RedisNode} import org.apache.spark._ @@ -65,11 +66,7 @@ class RedisKVRDD(prev: RDD[String], groupKeysByNode(nodes, keys).flatMap { case (node, nodeKeys) => val conn = node.endpoint.connect() val res = nodeKeys.flatMap{k => - Try(conn.hgetAll(k).toMap) match { - case Failure(e: JedisDataException) if Option(e.getMessage).getOrElse("").contains("WRONGTYPE") => None - case Failure(e) => throw e - case Success(value) => Some(value) - } + ignoreJedisWrongTypeException(Try(conn.hgetAll(k).toMap)).get }.flatten.iterator conn.close() res @@ -99,11 +96,7 @@ class RedisListRDD(prev: RDD[String], groupKeysByNode(nodes, keys).flatMap { case (node, nodeKeys) => val conn = node.endpoint.connect() val res: Iterator[String] = nodeKeys.flatMap{k => - Try(conn.smembers(k).toSet) match { - case Failure(e: JedisDataException) if Option(e.getMessage).getOrElse("").contains("WRONGTYPE") => None - case Failure(e) => throw e - case Success(value) => Some(value) - } + ignoreJedisWrongTypeException(Try(conn.smembers(k).toSet)).get }.flatten .iterator conn.close() @@ -115,11 +108,7 @@ class RedisListRDD(prev: RDD[String], groupKeysByNode(nodes, keys).flatMap { case (node, nodeKeys) => val conn = node.endpoint.connect() val res = nodeKeys.flatMap{ k => - Try(conn.lrange(k, 0, -1)) match { - case Failure(e: JedisDataException) if Option(e.getMessage).getOrElse("").contains("WRONGTYPE") => None - case Failure(e) => throw e - case Success(value) =>Some(value) - } + ignoreJedisWrongTypeException(Try(conn.lrange(k, 0, -1))).get }.flatten.iterator conn.close() res @@ -165,20 +154,12 @@ class RedisZSetRDD[T: ClassTag](prev: RDD[String], val res = { if (classTag[T] == classTag[(String, Double)]) { nodeKeys.flatMap{k => - Try(conn.zrangeWithScores(k, startPos, endPos)) match { - case Failure(e: JedisDataException) if Option(e.getMessage).getOrElse("").contains("WRONGTYPE") => None - case Failure(e) => throw e - case Success(value) => Some(value) - } + ignoreJedisWrongTypeException(Try(conn.zrangeWithScores(k, startPos, endPos))).get }.flatten .map(tup => (tup.getElement, tup.getScore)).iterator } else if (classTag[T] == classTag[String]) { nodeKeys.flatMap{k => - Try(conn.zrange(k, startPos, endPos)) match { - case Failure(e: JedisDataException) if Option(e.getMessage).getOrElse("").contains("WRONGTYPE") => None - case Failure(e) => throw e - case Success(value) => Some(value) - } + ignoreJedisWrongTypeException(Try(conn.zrange(k, startPos, endPos))).get }.flatten.iterator } else { throw new scala.Exception("Unknown RedisZSetRDD type") @@ -198,21 +179,13 @@ class RedisZSetRDD[T: ClassTag](prev: RDD[String], val res = { if (classTag[T] == classTag[(String, Double)]) { nodeKeys.flatMap{k => - Try(conn.zrangeByScoreWithScores(k, startScore, endScore)) match { - case Failure(e: JedisDataException) if Option(e.getMessage).getOrElse("").contains("WRONGTYPE") => None - case Failure(e) => throw e - case Success(value) => Some(value) - } + ignoreJedisWrongTypeException(Try(conn.zrangeByScoreWithScores(k, startScore, endScore))).get }. flatten .map(tup => (tup.getElement, tup.getScore)).iterator } else if (classTag[T] == classTag[String]) { nodeKeys.flatMap{ k => - Try(conn.zrangeByScore(k, startScore, endScore)) match { - case Failure(e: JedisDataException) if Option(e.getMessage).getOrElse("").contains("WRONGTYPE") => None - case Failure(e) => throw e - case Success(value) => Some(value) - } + ignoreJedisWrongTypeException(Try(conn.zrangeByScore(k, startScore, endScore))).get }.flatten.iterator } else { throw new scala.Exception("Unknown RedisZSetRDD type") diff --git a/src/main/scala/com/redislabs/provider/redis/util/ParseUtils.scala b/src/main/scala/com/redislabs/provider/redis/util/ParseUtils.scala index 9307d413..9eeb264f 100644 --- a/src/main/scala/com/redislabs/provider/redis/util/ParseUtils.scala +++ b/src/main/scala/com/redislabs/provider/redis/util/ParseUtils.scala @@ -1,11 +1,11 @@ package com.redislabs.provider.redis.util -import java.lang.{ - Boolean => JBoolean, Byte => JByte, Double => JDouble, Float => JFloat, Long => JLong, - Short => JShort -} +import java.lang.{Boolean => JBoolean, Byte => JByte, Double => JDouble, Float => JFloat, Long => JLong, Short => JShort} import org.apache.spark.sql.types._ +import redis.clients.jedis.exceptions.JedisDataException + +import scala.util.{Failure, Success, Try} /** * @author The Viet Nguyen @@ -41,4 +41,12 @@ object ParseUtils { case TimestampType => java.sql.Timestamp.valueOf(fieldValueStr) case _ => fieldValueStr } + + private[redis] def ignoreJedisWrongTypeException[T](tried: Try[T]): Try[Option[T]] = { + tried.transform(s => Success(Some(s)), { + // Swallow this exception + case e: JedisDataException if Option(e.getMessage).getOrElse("").contains("WRONGTYPE") => Success(None) + case e: Throwable => Failure(e) + }) + } } From a8f025ba6f8fa1829aca83c3126f1f693b10ebc6 Mon Sep 17 00:00:00 2001 From: Mayank Asthana Date: Tue, 8 Sep 2020 02:02:51 +0530 Subject: [PATCH 5/8] Do not silently ignore exceptions --- .../scala/com/redislabs/provider/redis/rdd/RedisRDD.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/scala/com/redislabs/provider/redis/rdd/RedisRDD.scala b/src/main/scala/com/redislabs/provider/redis/rdd/RedisRDD.scala index 836ac167..45d103b9 100644 --- a/src/main/scala/com/redislabs/provider/redis/rdd/RedisRDD.scala +++ b/src/main/scala/com/redislabs/provider/redis/rdd/RedisRDD.scala @@ -50,8 +50,8 @@ class RedisKVRDD(prev: RDD[String], .flatMap{ // Silently filter out this exception, when the value doesn't match the expected type "String" case (_, e: JedisDataException) if Option(e.getMessage).getOrElse("").contains("WRONGTYPE") => None - // Throw any other JedisDataException we encounter - case (_, e: JedisDataException) => throw e + // Throw any other Exception we encounter + case (_, e: Throwable) => throw e case (k, v: String) => Some(k, v) // Default case, that shouldn't really happen case _ => None From d55c5cf47ec5c87f84c2cb414a6117c66a8b21be Mon Sep 17 00:00:00 2001 From: Mayank Asthana Date: Thu, 10 Sep 2020 01:57:55 +0530 Subject: [PATCH 6/8] Keep filtering out `wrongtype` exceptions at a single place --- .../com/redislabs/provider/redis/rdd/RedisRDD.scala | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/src/main/scala/com/redislabs/provider/redis/rdd/RedisRDD.scala b/src/main/scala/com/redislabs/provider/redis/rdd/RedisRDD.scala index 45d103b9..397ba749 100644 --- a/src/main/scala/com/redislabs/provider/redis/rdd/RedisRDD.scala +++ b/src/main/scala/com/redislabs/provider/redis/rdd/RedisRDD.scala @@ -8,7 +8,6 @@ import com.redislabs.provider.redis.util.PipelineUtils.mapWithPipeline import com.redislabs.provider.redis.{ReadWriteConfig, RedisConfig, RedisNode} import org.apache.spark._ import org.apache.spark.rdd.RDD -import redis.clients.jedis.exceptions.JedisDataException import redis.clients.jedis.{Jedis, ScanParams} import redis.clients.jedis.util.JedisClusterCRC16 @@ -47,16 +46,14 @@ class RedisKVRDD(prev: RDD[String], } val res = nodeKeys.zip(response) + .view .flatMap{ - // Silently filter out this exception, when the value doesn't match the expected type "String" - case (_, e: JedisDataException) if Option(e.getMessage).getOrElse("").contains("WRONGTYPE") => None - // Throw any other Exception we encounter - case (_, e: Throwable) => throw e - case (k, v: String) => Some(k, v) - // Default case, that shouldn't really happen + case (_, e: Throwable) => Some(Failure(e)) + case (k, v: String) => Some(Success((k,v))) case _ => None - } + }.flatMap(ignoreJedisWrongTypeException(_).get) // Unwrap `Try` to throw exceptions if any .iterator + conn.close() res } From 01cc5192ec1423bbef3286421c1a74a80574ad2e Mon Sep 17 00:00:00 2001 From: Mayank Asthana Date: Thu, 10 Sep 2020 02:05:18 +0530 Subject: [PATCH 7/8] Add explicit test with a missing redis key --- .../provider/redis/rdd/RedisRddSuite.scala | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/src/test/scala/com/redislabs/provider/redis/rdd/RedisRddSuite.scala b/src/test/scala/com/redislabs/provider/redis/rdd/RedisRddSuite.scala index 7f41ee68..58bea0dc 100644 --- a/src/test/scala/com/redislabs/provider/redis/rdd/RedisRddSuite.scala +++ b/src/test/scala/com/redislabs/provider/redis/rdd/RedisRddSuite.scala @@ -22,6 +22,7 @@ trait RedisRddSuite extends SparkRedisSuite with Keys with Matchers { val hashKey: String = "all:words:cnt:hash" val listKey: String = "all:words:list" val setKey: String = "all:words:set" + val missingRedisKey: String = "missingRedisKey" override def beforeAll() { super.beforeAll() @@ -53,10 +54,12 @@ trait RedisRddSuite extends SparkRedisSuite with Keys with Matchers { val redisKVRDD = sc.fromRedisKV("*") val kvContents = redisKVRDD.sortByKey().collect val wrongTypeKeysRes = List(hashKey, zSetKey, listKey, setKey).map(sc.fromRedisKV(_).collect) + val missingKeyRes = sc.fromRedisKV(missingRedisKey).collect() val wcnts = contentWords.map((_, 1)).groupBy(_._1). map(x => (x._1, x._2.map(_._2).sum.toString)).toArray.sortBy(_._1) kvContents shouldBe wcnts all(wrongTypeKeysRes) should have size 0 + missingKeyRes should have size 0 } test("RedisZsetRDD") { @@ -80,6 +83,8 @@ trait RedisRddSuite extends SparkRedisSuite with Keys with Matchers { val zrangeByScore = redisZRangeByScore.collect.sorted val wrongTypeKeysRes = List(hashKey, setKey, listKey, contentWords(0)).map(sc.fromRedisZSetWithScore(_).collect) + val missingKeyRes = sc.fromRedisZSetWithScore(missingRedisKey).collect() + val wcnts = contentWords.map((_, 1)).groupBy(_._1). map(x => (x._1, x._2.map(_._2).sum.toDouble)) @@ -91,6 +96,7 @@ trait RedisRddSuite extends SparkRedisSuite with Keys with Matchers { .sortBy(x => (x._2, x._1))) zrangeByScore should be(wcnts.toArray.filter(x => x._2 >= 3 && x._2 <= 9).map(_._1).sorted) all(wrongTypeKeysRes) should have length 0 + missingKeyRes should have length 0 } test("RedisHashRDD") { @@ -99,8 +105,11 @@ trait RedisRddSuite extends SparkRedisSuite with Keys with Matchers { val wcnts = contentWords.map((_, 1)).groupBy(_._1). map(x => (x._1, x._2.map(_._2).sum.toString)).toArray.sortBy(_._1) val wrongTypeKeysRes = List(zSetKey, setKey, listKey, contentWords(0)).map(sc.fromRedisHash(_).collect) + val missingKeyRes = sc.fromRedisHash(missingRedisKey).collect() + hashContents should be(wcnts) all(wrongTypeKeysRes) should have length 0 + missingKeyRes should have length 0 } test("RedisListRDD") { @@ -108,8 +117,11 @@ trait RedisRddSuite extends SparkRedisSuite with Keys with Matchers { val listContents = redisListRDD.sortBy(x => x).collect val ws = contentWords.sorted val wrongTypeKeysRes = List(zSetKey, setKey, hashKey, contentWords(0)).map(sc.fromRedisList(_).collect) + val missingKeyRes = sc.fromRedisList(missingRedisKey).collect() + listContents should be(ws) all(wrongTypeKeysRes) should have length 0 + missingKeyRes should have length 0 } test("RedisSetRDD") { @@ -117,8 +129,11 @@ trait RedisRddSuite extends SparkRedisSuite with Keys with Matchers { val setContents = redisSetRDD.sortBy(x => x).collect val ws = content.split("\\W+").filter(!_.isEmpty).distinct.sorted val wrongTypeKeysRes = List(zSetKey, listKey, hashKey, contentWords(0)).map(sc.fromRedisSet(_).collect) + val missingKeyRes = sc.fromRedisSet(missingRedisKey).collect() + setContents should be(ws) all(wrongTypeKeysRes) should have length 0 + missingKeyRes should have length 0 } test("Expire") { From 218eddf143cd279b40854a698184fd308346592d Mon Sep 17 00:00:00 2001 From: Mayank Asthana Date: Thu, 10 Sep 2020 15:09:00 +0530 Subject: [PATCH 8/8] Do not use view --- src/main/scala/com/redislabs/provider/redis/rdd/RedisRDD.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/src/main/scala/com/redislabs/provider/redis/rdd/RedisRDD.scala b/src/main/scala/com/redislabs/provider/redis/rdd/RedisRDD.scala index 397ba749..40417451 100644 --- a/src/main/scala/com/redislabs/provider/redis/rdd/RedisRDD.scala +++ b/src/main/scala/com/redislabs/provider/redis/rdd/RedisRDD.scala @@ -46,7 +46,6 @@ class RedisKVRDD(prev: RDD[String], } val res = nodeKeys.zip(response) - .view .flatMap{ case (_, e: Throwable) => Some(Failure(e)) case (k, v: String) => Some(Success((k,v)))