Skip to content

#304: added toRedisByteHASHes function #306

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Aug 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions doc/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ topology from the initial node, so there is no need to provide the rest of the c
* `spark.redis.timeout` - connection timeout in ms, 2000 ms by default
* `spark.redis.max.pipeline.size` - the maximum number of commands per pipeline (used to batch commands). The default value is 100.
* `spark.redis.scan.count` - count option of SCAN command (used to iterate over keys). The default value is 100.
* `spark.redis.rdd.write.iterator.grouping.size` - applied for RDD write operations, the number of items to be grouped when iterating over underlying RDD partition
* `spark.redis.ssl` - set to true to use tls


Expand Down
6 changes: 6 additions & 0 deletions doc/rdd.md
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,12 @@ sc.toRedisHASHes(hashRDD, ttl)

The `hashRDD` is a rdd of tuples (`hashname`, `map[field name, field value]`)

```scala
sc.toRedisByteHASHes(hashRDD, ttl)
```

The `hashRDD` is a rdd of tuples (`hashname`, `map[field name, field value]`) represented as byte arrays.

#### Lists
Use the following to store an RDD in a Redis List:

Expand Down
11 changes: 8 additions & 3 deletions src/main/scala/com/redislabs/provider/redis/RedisConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ case class RedisNode(endpoint: RedisEndpoint,
/**
* Tuning options for read and write operations.
*/
case class ReadWriteConfig(scanCount: Int, maxPipelineSize: Int)
case class ReadWriteConfig(scanCount: Int, maxPipelineSize: Int, rddWriteIteratorGroupingSize: Int)

object ReadWriteConfig {
/** maximum number of commands per pipeline **/
Expand All @@ -104,12 +104,17 @@ object ReadWriteConfig {
val ScanCountConfKey = "spark.redis.scan.count"
val ScanCountDefault = 100

val Default: ReadWriteConfig = ReadWriteConfig(ScanCountDefault, MaxPipelineSizeDefault)
/** Iterator grouping size for writing RDD **/
val RddWriteIteratorGroupingSizeKey = "spark.redis.rdd.write.iterator.grouping.size"
val RddWriteIteratorGroupingSizeDefault = 1000

val Default: ReadWriteConfig = ReadWriteConfig(ScanCountDefault, MaxPipelineSizeDefault, RddWriteIteratorGroupingSizeDefault)

def fromSparkConf(conf: SparkConf): ReadWriteConfig = {
ReadWriteConfig(
conf.getInt(ScanCountConfKey, ScanCountDefault),
conf.getInt(MaxPipelineSizeConfKey, MaxPipelineSizeDefault)
conf.getInt(MaxPipelineSizeConfKey, MaxPipelineSizeDefault),
conf.getInt(RddWriteIteratorGroupingSizeKey, RddWriteIteratorGroupingSizeDefault)
)
}
}
Expand Down
79 changes: 61 additions & 18 deletions src/main/scala/com/redislabs/provider/redis/redisFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -266,18 +266,31 @@ class RedisContext(@transient val sc: SparkContext) extends Serializable {
}

/**
* Write RDD of (hash name, hash KVs)
*
* @param kvs RDD of tuples (hash name, Map(hash field name, hash field value))
* @param ttl time to live
*/
* Write RDD of (hash name, hash KVs)
*
* @param kvs RDD of tuples (hash name, Map(hash field name, hash field value))
* @param ttl time to live
*/
def toRedisHASHes(kvs: RDD[(String, Map[String, String])], ttl: Int = 0)
(implicit
redisConfig: RedisConfig = RedisConfig.fromSparkConf(sc.getConf),
readWriteConfig: ReadWriteConfig = ReadWriteConfig.fromSparkConf(sc.getConf)) {
(implicit
redisConfig: RedisConfig = RedisConfig.fromSparkConf(sc.getConf),
readWriteConfig: ReadWriteConfig = ReadWriteConfig.fromSparkConf(sc.getConf)) {
kvs.foreachPartition(partition => setHash(partition, ttl, redisConfig, readWriteConfig))
}

/**
* Write RDD of (hash name, hash KVs). Values are represented as byte array.
*
* @param kvs RDD of tuples (hash name, Map(hash field name, hash field value))
* @param ttl time to live
*/
def toRedisByteHASHes(kvs: RDD[(Array[Byte], Map[Array[Byte], Array[Byte]])], ttl: Int = 0)
(implicit
redisConfig: RedisConfig = RedisConfig.fromSparkConf(sc.getConf),
readWriteConfig: ReadWriteConfig = ReadWriteConfig.fromSparkConf(sc.getConf)) {
kvs.foreachPartition(partition => setByteHash(partition, ttl, redisConfig, readWriteConfig))
}

/**
* @param kvs Pair RDD of K/V
* @param zsetName target zset's name which hold all the kvs
Expand Down Expand Up @@ -322,16 +335,15 @@ class RedisContext(@transient val sc: SparkContext) extends Serializable {
*/
def toRedisLISTs(rdd: RDD[(String, Seq[String])], ttl: Int = 0)
(implicit
redisConfig: RedisConfig = RedisConfig.fromSparkConf(sc.getConf),
readWriteConfig: ReadWriteConfig = ReadWriteConfig.fromSparkConf(sc.getConf)) {
redisConfig: RedisConfig = RedisConfig.fromSparkConf(sc.getConf),
readWriteConfig: ReadWriteConfig = ReadWriteConfig.fromSparkConf(sc.getConf)) {
rdd.foreachPartition(partition => setList(partition, ttl, redisConfig, readWriteConfig))
}

/**
* Write RDD of binary values to Redis Lists.
*
* @deprecated use toRedisByteLISTs, the method name has changed to make API consistent
*
* @param rdd RDD of tuples (list name, list values)
* @param ttl time to live
*/
Expand All @@ -350,9 +362,9 @@ class RedisContext(@transient val sc: SparkContext) extends Serializable {
* @param ttl time to live
*/
def toRedisByteLISTs(rdd: RDD[(Array[Byte], Seq[Array[Byte]])], ttl: Int = 0)
(implicit
redisConfig: RedisConfig = RedisConfig.fromSparkConf(sc.getConf),
readWriteConfig: ReadWriteConfig = ReadWriteConfig.fromSparkConf(sc.getConf)) {
(implicit
redisConfig: RedisConfig = RedisConfig.fromSparkConf(sc.getConf),
readWriteConfig: ReadWriteConfig = ReadWriteConfig.fromSparkConf(sc.getConf)) {
rdd.foreachPartition(partition => setByteList(partition, ttl, redisConfig, readWriteConfig))
}

Expand Down Expand Up @@ -416,10 +428,10 @@ object RedisContext extends Serializable {
}

/**
* @param hashes hashName: map of k/vs to be saved in the target host
* @param ttl time to live
*/
def setHash(hashes: Iterator[(String, Map[String,String])],
* @param hashes hashName: map of k/vs to be saved in the target host
* @param ttl time to live
*/
def setHash(hashes: Iterator[(String, Map[String, String])],
ttl: Int,
redisConfig: RedisConfig,
readWriteConfig: ReadWriteConfig) {
Expand All @@ -442,6 +454,37 @@ object RedisContext extends Serializable {
}
}

/**
* @param hashes hashName: map of k/vs to be saved in the target host
* @param ttl time to live
*/
def setByteHash(hashes: Iterator[(Array[Byte], Map[Array[Byte], Array[Byte]])],
ttl: Int,
redisConfig: RedisConfig,
readWriteConfig: ReadWriteConfig) {
implicit val rwConf: ReadWriteConfig = readWriteConfig

hashes
.map { case (key, hashFields) =>
(redisConfig.getHost(key), (key, hashFields))
}
.grouped(readWriteConfig.rddWriteIteratorGroupingSize)
.foreach { batch =>
batch
.toArray
.groupBy(_._1)
.foreach { case (node, arr) =>
withConnection(node.endpoint.connect()) { conn =>
foreachWithPipeline(conn, arr) { (pipeline, a) =>
val (key, hashFields) = a._2
pipeline.hmset(key, hashFields)
if (ttl > 0) pipeline.expire(key, ttl)
}
}
}
}
}

/**
* @param zsetName
* @param arr k/vs which should be saved in the target host
Expand Down
3 changes: 3 additions & 0 deletions src/main/scala/org/apache/spark/sql/redis/redis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ package object redis {

val RedisFormat = "org.apache.spark.sql.redis"

val RddWriteIteratorGroupingSize = "rdd.write.iterator.grouping.size"
val RddWriteIteratorGroupingSizeDefault = 1000

val SqlOptionFilterKeysByType = "filter.keys.by.type"
val SqlOptionNumPartitions = "partitions.number"
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,21 @@ trait RedisRddExtraSuite extends SparkRedisSuite with Keys with Matchers {
verifyHash("hash2", map2)
}

test("toRedisByteHASHes") {
val map1 = Map("k1" -> "v1", "k2" -> "v2")
val map2 = Map("k3" -> "v3", "k4" -> "v4")
val hashes = Seq(
("hash1", map1),
("hash2", map2)
)
val hashesBytes = hashes.map { case (k, hash) => (k.getBytes, hash.map { case (mapKey, mapVal) => (mapKey.getBytes, mapVal.getBytes) }) }
val rdd = sc.parallelize(hashesBytes)
sc.toRedisByteHASHes(rdd)

verifyHash("hash1", map1)
verifyHash("hash2", map2)
}

def verifyList(list: String, vals: Seq[String]): Unit = {
withConnection(redisConfig.getHost(list).endpoint.connect()) { conn =>
conn.lrange(list, 0, vals.size).asScala should be(vals.toList)
Expand Down