Skip to content

Commit cd30036

Browse files
authored
Merge pull request #269 from phyok/toRedisHASHes
Issue 49: added toRedisHASHes function (#49)
2 parents 7223a63 + 85eca95 commit cd30036

File tree

3 files changed

+69
-0
lines changed

3 files changed

+69
-0
lines changed

doc/rdd.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,14 @@ sc.toRedisHASH(hashRDD, hashName, ttl)
137137

138138
By default, Hashes won't have any expiry set.
139139

140+
Use the following to store an RDD into multiple hashs:
141+
142+
```scala
143+
sc.toRedisHASHes(hashRDD, ttl)
144+
```
145+
146+
The `hashRDD` is a rdd of tuples (`hashname`, `map[field name, field value]`)
147+
140148
#### Lists
141149
Use the following to store an RDD in a Redis List:
142150

src/main/scala/com/redislabs/provider/redis/redisFunctions.scala

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import com.redislabs.provider.redis.util.ConnectionUtils.withConnection
55
import com.redislabs.provider.redis.util.PipelineUtils._
66
import org.apache.spark.SparkContext
77
import org.apache.spark.rdd.RDD
8+
import scala.collection.JavaConversions.mapAsJavaMap
89

910
/**
1011
* RedisContext extends sparkContext's functionality with redis functions
@@ -264,6 +265,19 @@ class RedisContext(@transient val sc: SparkContext) extends Serializable {
264265
kvs.foreachPartition(partition => setHash(hashName, partition, ttl, redisConfig, readWriteConfig))
265266
}
266267

268+
/**
269+
* Write RDD of (hash name, hash KVs)
270+
*
271+
* @param kvs RDD of tuples (hash name, Map(hash field name, hash field value))
272+
* @param ttl time to live
273+
*/
274+
def toRedisHASHes(kvs: RDD[(String, Map[String, String])], ttl: Int = 0)
275+
(implicit
276+
redisConfig: RedisConfig = RedisConfig.fromSparkConf(sc.getConf),
277+
readWriteConfig: ReadWriteConfig = ReadWriteConfig.fromSparkConf(sc.getConf)) {
278+
kvs.foreachPartition(partition => setHash(partition, ttl, redisConfig, readWriteConfig))
279+
}
280+
267281
/**
268282
* @param kvs Pair RDD of K/V
269283
* @param zsetName target zset's name which hold all the kvs
@@ -401,6 +415,33 @@ object RedisContext extends Serializable {
401415
conn.close()
402416
}
403417

418+
/**
419+
* @param hashes hashName: map of k/vs to be saved in the target host
420+
* @param ttl time to live
421+
*/
422+
def setHash(hashes: Iterator[(String, Map[String,String])],
423+
ttl: Int,
424+
redisConfig: RedisConfig,
425+
readWriteConfig: ReadWriteConfig) {
426+
implicit val rwConf: ReadWriteConfig = readWriteConfig
427+
428+
hashes
429+
.map { case (key, hashFields) =>
430+
(redisConfig.getHost(key), (key, hashFields))
431+
}
432+
.toArray
433+
.groupBy(_._1)
434+
.foreach { case (node, arr) =>
435+
withConnection(node.endpoint.connect()) { conn =>
436+
foreachWithPipeline(conn, arr) { (pipeline, a) =>
437+
val (key, hashFields) = a._2
438+
pipeline.hmset(key, hashFields)
439+
if (ttl > 0) pipeline.expire(key, ttl)
440+
}
441+
}
442+
}
443+
}
444+
404445
/**
405446
* @param zsetName
406447
* @param arr k/vs which should be saved in the target host

src/test/scala/com/redislabs/provider/redis/rdd/RedisRddExtraSuite.scala

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,30 @@ trait RedisRddExtraSuite extends SparkRedisSuite with Keys with Matchers {
4242
verifyList("list2", list2)
4343
}
4444

45+
test("toRedisHASHes") {
46+
val map1 = Map("k1" -> "v1", "k2" -> "v2")
47+
val map2 = Map("k3" -> "v3", "k4" -> "v4")
48+
val hashes = Seq(
49+
("hash1", map1),
50+
("hash2", map2)
51+
)
52+
val rdd = sc.parallelize(hashes)
53+
sc.toRedisHASHes(rdd)
54+
55+
verifyHash("hash1", map1)
56+
verifyHash("hash2", map2)
57+
}
58+
4559
def verifyList(list: String, vals: Seq[String]): Unit = {
4660
withConnection(redisConfig.getHost(list).endpoint.connect()) { conn =>
4761
conn.lrange(list, 0, vals.size).asScala should be(vals.toList)
4862
}
4963
}
5064

65+
def verifyHash(hash: String, vals: Map[String, String]): Unit = {
66+
withConnection(redisConfig.getHost(hash).endpoint.connect()) { conn =>
67+
conn.hgetAll(hash).asScala should be(vals)
68+
}
69+
}
70+
5171
}

0 commit comments

Comments
 (0)