From 85eca95a3552462f541c4c1b1128a1e5c75c8a16 Mon Sep 17 00:00:00 2001 From: Phyo Kyaw Date: Mon, 14 Sep 2020 18:17:58 -0700 Subject: [PATCH] added toRedisHASHes function (#49) --- doc/rdd.md | 8 ++++ .../provider/redis/redisFunctions.scala | 41 +++++++++++++++++++ .../redis/rdd/RedisRddExtraSuite.scala | 20 +++++++++ 3 files changed, 69 insertions(+) diff --git a/doc/rdd.md b/doc/rdd.md index 5da4ea61..dac66c8c 100644 --- a/doc/rdd.md +++ b/doc/rdd.md @@ -137,6 +137,14 @@ sc.toRedisHASH(hashRDD, hashName, ttl) By default, Hashes won't have any expiry set. +Use the following to store an RDD into multiple hashs: + +```scala +sc.toRedisHASHes(hashRDD, ttl) +``` + +The `hashRDD` is a rdd of tuples (`hashname`, `map[field name, field value]`) + #### Lists Use the following to store an RDD in a Redis List: diff --git a/src/main/scala/com/redislabs/provider/redis/redisFunctions.scala b/src/main/scala/com/redislabs/provider/redis/redisFunctions.scala index d1e7a22c..873008fa 100644 --- a/src/main/scala/com/redislabs/provider/redis/redisFunctions.scala +++ b/src/main/scala/com/redislabs/provider/redis/redisFunctions.scala @@ -5,6 +5,7 @@ import com.redislabs.provider.redis.util.ConnectionUtils.withConnection import com.redislabs.provider.redis.util.PipelineUtils._ import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD +import scala.collection.JavaConversions.mapAsJavaMap /** * RedisContext extends sparkContext's functionality with redis functions @@ -264,6 +265,19 @@ class RedisContext(@transient val sc: SparkContext) extends Serializable { kvs.foreachPartition(partition => setHash(hashName, partition, ttl, redisConfig, readWriteConfig)) } + /** + * Write RDD of (hash name, hash KVs) + * + * @param kvs RDD of tuples (hash name, Map(hash field name, hash field value)) + * @param ttl time to live + */ + def toRedisHASHes(kvs: RDD[(String, Map[String, String])], ttl: Int = 0) + (implicit + redisConfig: RedisConfig = RedisConfig.fromSparkConf(sc.getConf), + readWriteConfig: ReadWriteConfig = ReadWriteConfig.fromSparkConf(sc.getConf)) { + kvs.foreachPartition(partition => setHash(partition, ttl, redisConfig, readWriteConfig)) + } + /** * @param kvs Pair RDD of K/V * @param zsetName target zset's name which hold all the kvs @@ -401,6 +415,33 @@ object RedisContext extends Serializable { conn.close() } + /** + * @param hashes hashName: map of k/vs to be saved in the target host + * @param ttl time to live + */ + def setHash(hashes: Iterator[(String, Map[String,String])], + ttl: Int, + redisConfig: RedisConfig, + readWriteConfig: ReadWriteConfig) { + implicit val rwConf: ReadWriteConfig = readWriteConfig + + hashes + .map { case (key, hashFields) => + (redisConfig.getHost(key), (key, hashFields)) + } + .toArray + .groupBy(_._1) + .foreach { case (node, arr) => + withConnection(node.endpoint.connect()) { conn => + foreachWithPipeline(conn, arr) { (pipeline, a) => + val (key, hashFields) = a._2 + pipeline.hmset(key, hashFields) + if (ttl > 0) pipeline.expire(key, ttl) + } + } + } + } + /** * @param zsetName * @param arr k/vs which should be saved in the target host diff --git a/src/test/scala/com/redislabs/provider/redis/rdd/RedisRddExtraSuite.scala b/src/test/scala/com/redislabs/provider/redis/rdd/RedisRddExtraSuite.scala index dc0fe381..4d8a93a6 100644 --- a/src/test/scala/com/redislabs/provider/redis/rdd/RedisRddExtraSuite.scala +++ b/src/test/scala/com/redislabs/provider/redis/rdd/RedisRddExtraSuite.scala @@ -42,10 +42,30 @@ trait RedisRddExtraSuite extends SparkRedisSuite with Keys with Matchers { verifyList("list2", list2) } + test("toRedisHASHes") { + val map1 = Map("k1" -> "v1", "k2" -> "v2") + val map2 = Map("k3" -> "v3", "k4" -> "v4") + val hashes = Seq( + ("hash1", map1), + ("hash2", map2) + ) + val rdd = sc.parallelize(hashes) + sc.toRedisHASHes(rdd) + + verifyHash("hash1", map1) + verifyHash("hash2", map2) + } + def verifyList(list: String, vals: Seq[String]): Unit = { withConnection(redisConfig.getHost(list).endpoint.connect()) { conn => conn.lrange(list, 0, vals.size).asScala should be(vals.toList) } } + def verifyHash(hash: String, vals: Map[String, String]): Unit = { + withConnection(redisConfig.getHost(hash).endpoint.connect()) { conn => + conn.hgetAll(hash).asScala should be(vals) + } + } + }