diff --git a/src/main/scala/com/redislabs/provider/redis/rdd/RedisRDD.scala b/src/main/scala/com/redislabs/provider/redis/rdd/RedisRDD.scala index 142a1c03..40417451 100644 --- a/src/main/scala/com/redislabs/provider/redis/rdd/RedisRDD.scala +++ b/src/main/scala/com/redislabs/provider/redis/rdd/RedisRDD.scala @@ -3,6 +3,7 @@ package com.redislabs.provider.redis.rdd import java.util import com.redislabs.provider.redis.partitioner._ +import com.redislabs.provider.redis.util.ParseUtils.ignoreJedisWrongTypeException import com.redislabs.provider.redis.util.PipelineUtils.mapWithPipeline import com.redislabs.provider.redis.{ReadWriteConfig, RedisConfig, RedisNode} import org.apache.spark._ @@ -12,6 +13,7 @@ import redis.clients.jedis.util.JedisClusterCRC16 import scala.collection.JavaConversions._ import scala.reflect.{ClassTag, classTag} +import scala.util.{Failure, Success, Try} class RedisKVRDD(prev: RDD[String], @@ -38,11 +40,19 @@ class RedisKVRDD(prev: RDD[String], def getKV(nodes: Array[RedisNode], keys: Iterator[String]): Iterator[(String, String)] = { groupKeysByNode(nodes, keys).flatMap { case (node, nodeKeys) => val conn = node.endpoint.connect() - val stringKeys = filterKeysByType(conn, nodeKeys, "string") - val response = mapWithPipeline(conn, stringKeys) { (pipeline, key) => + + val response = mapWithPipeline(conn, nodeKeys) { (pipeline, key) => pipeline.get(key) } - val res = stringKeys.zip(response).iterator.asInstanceOf[Iterator[(String, String)]] + + val res = nodeKeys.zip(response) + .flatMap{ + case (_, e: Throwable) => Some(Failure(e)) + case (k, v: String) => Some(Success((k,v))) + case _ => None + }.flatMap(ignoreJedisWrongTypeException(_).get) // Unwrap `Try` to throw exceptions if any + .iterator + conn.close() res } @@ -51,8 +61,9 @@ class RedisKVRDD(prev: RDD[String], def getHASH(nodes: Array[RedisNode], keys: Iterator[String]): Iterator[(String, String)] = { groupKeysByNode(nodes, keys).flatMap { case (node, nodeKeys) => val conn = node.endpoint.connect() - val hashKeys = filterKeysByType(conn, nodeKeys, "hash") - val res = hashKeys.flatMap(conn.hgetAll).iterator + val res = nodeKeys.flatMap{k => + ignoreJedisWrongTypeException(Try(conn.hgetAll(k).toMap)).get + }.flatten.iterator conn.close() res } @@ -80,8 +91,10 @@ class RedisListRDD(prev: RDD[String], def getSET(nodes: Array[RedisNode], keys: Iterator[String]): Iterator[String] = { groupKeysByNode(nodes, keys).flatMap { case (node, nodeKeys) => val conn = node.endpoint.connect() - val setKeys = filterKeysByType(conn, nodeKeys, "set") - val res = setKeys.flatMap(conn.smembers).iterator + val res: Iterator[String] = nodeKeys.flatMap{k => + ignoreJedisWrongTypeException(Try(conn.smembers(k).toSet)).get + }.flatten + .iterator conn.close() res } @@ -90,8 +103,9 @@ class RedisListRDD(prev: RDD[String], def getLIST(nodes: Array[RedisNode], keys: Iterator[String]): Iterator[String] = { groupKeysByNode(nodes, keys).flatMap { case (node, nodeKeys) => val conn = node.endpoint.connect() - val listKeys = filterKeysByType(conn, nodeKeys, "list") - val res = listKeys.flatMap(conn.lrange(_, 0, -1)).iterator + val res = nodeKeys.flatMap{ k => + ignoreJedisWrongTypeException(Try(conn.lrange(k, 0, -1))).get + }.flatten.iterator conn.close() res } @@ -133,13 +147,16 @@ class RedisZSetRDD[T: ClassTag](prev: RDD[String], endPos: Long): Iterator[T] = { groupKeysByNode(nodes, keys).flatMap { case (node, nodeKeys) => val conn = node.endpoint.connect() - val zsetKeys = filterKeysByType(conn, nodeKeys, "zset") val res = { if (classTag[T] == classTag[(String, Double)]) { - zsetKeys.flatMap(k => conn.zrangeWithScores(k, startPos, endPos)). - map(tup => (tup.getElement, tup.getScore)).iterator + nodeKeys.flatMap{k => + ignoreJedisWrongTypeException(Try(conn.zrangeWithScores(k, startPos, endPos))).get + }.flatten + .map(tup => (tup.getElement, tup.getScore)).iterator } else if (classTag[T] == classTag[String]) { - zsetKeys.flatMap(k => conn.zrange(k, startPos, endPos)).iterator + nodeKeys.flatMap{k => + ignoreJedisWrongTypeException(Try(conn.zrange(k, startPos, endPos))).get + }.flatten.iterator } else { throw new scala.Exception("Unknown RedisZSetRDD type") } @@ -155,13 +172,17 @@ class RedisZSetRDD[T: ClassTag](prev: RDD[String], endScore: Double): Iterator[T] = { groupKeysByNode(nodes, keys).flatMap { case (node, nodeKeys) => val conn = node.endpoint.connect() - val zsetKeys = filterKeysByType(conn, nodeKeys, "zset") val res = { if (classTag[T] == classTag[(String, Double)]) { - zsetKeys.flatMap(k => conn.zrangeByScoreWithScores(k, startScore, endScore)). - map(tup => (tup.getElement, tup.getScore)).iterator + nodeKeys.flatMap{k => + ignoreJedisWrongTypeException(Try(conn.zrangeByScoreWithScores(k, startScore, endScore))).get + }. + flatten + .map(tup => (tup.getElement, tup.getScore)).iterator } else if (classTag[T] == classTag[String]) { - zsetKeys.flatMap(k => conn.zrangeByScore(k, startScore, endScore)).iterator + nodeKeys.flatMap{ k => + ignoreJedisWrongTypeException(Try(conn.zrangeByScore(k, startScore, endScore))).get + }.flatten.iterator } else { throw new scala.Exception("Unknown RedisZSetRDD type") } diff --git a/src/main/scala/com/redislabs/provider/redis/util/ParseUtils.scala b/src/main/scala/com/redislabs/provider/redis/util/ParseUtils.scala index 9307d413..9eeb264f 100644 --- a/src/main/scala/com/redislabs/provider/redis/util/ParseUtils.scala +++ b/src/main/scala/com/redislabs/provider/redis/util/ParseUtils.scala @@ -1,11 +1,11 @@ package com.redislabs.provider.redis.util -import java.lang.{ - Boolean => JBoolean, Byte => JByte, Double => JDouble, Float => JFloat, Long => JLong, - Short => JShort -} +import java.lang.{Boolean => JBoolean, Byte => JByte, Double => JDouble, Float => JFloat, Long => JLong, Short => JShort} import org.apache.spark.sql.types._ +import redis.clients.jedis.exceptions.JedisDataException + +import scala.util.{Failure, Success, Try} /** * @author The Viet Nguyen @@ -41,4 +41,12 @@ object ParseUtils { case TimestampType => java.sql.Timestamp.valueOf(fieldValueStr) case _ => fieldValueStr } + + private[redis] def ignoreJedisWrongTypeException[T](tried: Try[T]): Try[Option[T]] = { + tried.transform(s => Success(Some(s)), { + // Swallow this exception + case e: JedisDataException if Option(e.getMessage).getOrElse("").contains("WRONGTYPE") => Success(None) + case e: Throwable => Failure(e) + }) + } } diff --git a/src/test/scala/com/redislabs/provider/redis/rdd/RedisRddSuite.scala b/src/test/scala/com/redislabs/provider/redis/rdd/RedisRddSuite.scala index 807ef6a1..58bea0dc 100644 --- a/src/test/scala/com/redislabs/provider/redis/rdd/RedisRddSuite.scala +++ b/src/test/scala/com/redislabs/provider/redis/rdd/RedisRddSuite.scala @@ -22,6 +22,7 @@ trait RedisRddSuite extends SparkRedisSuite with Keys with Matchers { val hashKey: String = "all:words:cnt:hash" val listKey: String = "all:words:list" val setKey: String = "all:words:set" + val missingRedisKey: String = "missingRedisKey" override def beforeAll() { super.beforeAll() @@ -52,9 +53,13 @@ trait RedisRddSuite extends SparkRedisSuite with Keys with Matchers { test("RedisKVRDD") { val redisKVRDD = sc.fromRedisKV("*") val kvContents = redisKVRDD.sortByKey().collect + val wrongTypeKeysRes = List(hashKey, zSetKey, listKey, setKey).map(sc.fromRedisKV(_).collect) + val missingKeyRes = sc.fromRedisKV(missingRedisKey).collect() val wcnts = contentWords.map((_, 1)).groupBy(_._1). map(x => (x._1, x._2.map(_._2).sum.toString)).toArray.sortBy(_._1) kvContents shouldBe wcnts + all(wrongTypeKeysRes) should have size 0 + missingKeyRes should have size 0 } test("RedisZsetRDD") { @@ -77,6 +82,9 @@ trait RedisRddSuite extends SparkRedisSuite with Keys with Matchers { val redisZRangeByScore = sc.fromRedisZRangeByScore(zSetKey, 3, 9) val zrangeByScore = redisZRangeByScore.collect.sorted + val wrongTypeKeysRes = List(hashKey, setKey, listKey, contentWords(0)).map(sc.fromRedisZSetWithScore(_).collect) + val missingKeyRes = sc.fromRedisZSetWithScore(missingRedisKey).collect() + val wcnts = contentWords.map((_, 1)).groupBy(_._1). map(x => (x._1, x._2.map(_._2).sum.toDouble)) @@ -87,6 +95,8 @@ trait RedisRddSuite extends SparkRedisSuite with Keys with Matchers { zrangeByScoreWithScore should be(wcnts.toArray.filter(x => x._2 >= 3 && x._2 <= 9) .sortBy(x => (x._2, x._1))) zrangeByScore should be(wcnts.toArray.filter(x => x._2 >= 3 && x._2 <= 9).map(_._1).sorted) + all(wrongTypeKeysRes) should have length 0 + missingKeyRes should have length 0 } test("RedisHashRDD") { @@ -94,21 +104,36 @@ trait RedisRddSuite extends SparkRedisSuite with Keys with Matchers { val hashContents = redisHashRDD.sortByKey().collect val wcnts = contentWords.map((_, 1)).groupBy(_._1). map(x => (x._1, x._2.map(_._2).sum.toString)).toArray.sortBy(_._1) + val wrongTypeKeysRes = List(zSetKey, setKey, listKey, contentWords(0)).map(sc.fromRedisHash(_).collect) + val missingKeyRes = sc.fromRedisHash(missingRedisKey).collect() + hashContents should be(wcnts) + all(wrongTypeKeysRes) should have length 0 + missingKeyRes should have length 0 } test("RedisListRDD") { val redisListRDD = sc.fromRedisList(listKey) val listContents = redisListRDD.sortBy(x => x).collect val ws = contentWords.sorted + val wrongTypeKeysRes = List(zSetKey, setKey, hashKey, contentWords(0)).map(sc.fromRedisList(_).collect) + val missingKeyRes = sc.fromRedisList(missingRedisKey).collect() + listContents should be(ws) + all(wrongTypeKeysRes) should have length 0 + missingKeyRes should have length 0 } test("RedisSetRDD") { val redisSetRDD = sc.fromRedisSet(setKey) val setContents = redisSetRDD.sortBy(x => x).collect val ws = content.split("\\W+").filter(!_.isEmpty).distinct.sorted + val wrongTypeKeysRes = List(zSetKey, listKey, hashKey, contentWords(0)).map(sc.fromRedisSet(_).collect) + val missingKeyRes = sc.fromRedisSet(missingRedisKey).collect() + setContents should be(ws) + all(wrongTypeKeysRes) should have length 0 + missingKeyRes should have length 0 } test("Expire") {