Skip to content

Commit 68c8f2a

Browse files
authored
Merge branch 'master' into fix-issue-132
2 parents 4e27bed + 6aaaba2 commit 68c8f2a

34 files changed

+1166
-135
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ This library is work in progress so the API may change before the official relea
3434
- [RDD](doc/rdd.md)
3535
- [Dataframe](doc/dataframe.md)
3636
- [Streaming](doc/streaming.md)
37+
- [Structured Streaming](doc/structured-streaming.md)
3738
- [Cluster](doc/cluster.md)
3839
- [Java](doc/java.md)
3940
- [Python](doc/python.md)

doc/python.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ $ ./bin/pyspark --jars <path-to>/spark-redis-<version>-jar-with-dependencies.jar
1414
By default it connects to `localhost:6379` without any password, you can change the connection settings in the following manner:
1515

1616
```bash
17-
$ bin/spark-shell --jars <path-to>/spark-redis-<version>-jar-with-dependencies.jar --conf "spark.redis.host=localhost" --conf "spark.redis.port=6379" --conf "spark.redis.auth=passwd"
17+
$ bin/pyspark --jars <path-to>/spark-redis-<version>-jar-with-dependencies.jar --conf "spark.redis.host=localhost" --conf "spark.redis.port=6379" --conf "spark.redis.auth=passwd"
1818
```
1919

2020

doc/structured-streaming.md

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
### Structured Streaming
2+
3+
Spark-Redis supports [Redis Stream](https://redis.io/topics/streams-intro) data structure as a source for [Structured Streaming](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html):
4+
5+
The following example reads data from a Redis Stream `sensors` that has two fields `sensor-id` and `temperature`:
6+
7+
```scala
8+
val spark = SparkSession
9+
.builder
10+
.master("local[*]")
11+
.config("spark.host", "spark.redis.host")
12+
.config("spark.redis.port", "6379")
13+
.getOrCreate()
14+
15+
val sensors = spark
16+
.readStream
17+
.format("redis") // read from Redis
18+
.option("stream.keys", "sensors") // stream key
19+
.schema(StructType(Array( // stream fields
20+
StructField("sensor-id", StringType),
21+
StructField("temperature", FloatType)
22+
)))
23+
.load()
24+
25+
val query = sensors
26+
.writeStream
27+
.format("console")
28+
.start()
29+
30+
query.awaitTermination()
31+
32+
```
33+
34+
You can write the following items to the stream to test it:
35+
36+
```
37+
xadd sensors * sensor-id 1 temperature 28.1
38+
xadd sensors * sensor-id 2 temperature 30.5
39+
xadd sensors * sensor-id 1 temperature 28.3
40+
```
41+
42+
### Stream Offset
43+
44+
By default it pulls messages starting from the latest message in the stream. If you need to start from the specific position in the stream, specify the `stream.offsets` parameter as a JSON string.
45+
In the following example we set offset id to be `1548083485360-0`. The group name `redis-source` is a default consumer group that spark-redis automatically creates to read stream.
46+
47+
```scala
48+
val offsets = """{"offsets":{"sensors":{"groupName":"redis-source","offset":"1548083485360-0"}}}"""
49+
50+
...
51+
52+
.option("stream.offsets", offsets)
53+
```
54+
55+
If you want to process stream from the beginning, set offset id to `0-0`.
56+
57+
### Entry id column
58+
59+
You can access stream entry id by adding a column `_id` to the stream schema:
60+
61+
```
62+
val sensors = spark
63+
.readStream
64+
.format("redis")
65+
.option("stream.keys", "sensors")
66+
.schema(StructType(Array(
67+
StructField("_id", StringType), // entry id
68+
StructField("sensor-id", StringType),
69+
StructField("temperature", FloatType)
70+
)))
71+
.load()
72+
```
73+
74+
The stream schema:
75+
76+
77+
```
78+
+---------------+---------+-----------+
79+
| _id|sensor-id|temperature|
80+
+---------------+---------+-----------+
81+
|1548083485360-0| 1| 28.1|
82+
|1548083486248-0| 2| 30.5|
83+
|1548083486944-0| 1| 28.3|
84+
+---------------+---------+-----------+
85+
86+
```
87+
88+
89+
### Level of Parallelism
90+
91+
By default spark-redis creates a consumer group with a single consumer. There are two options how you can increase the level of parallelism.
92+
93+
The first approach is to create stream from multiple Redis keys. You can specify multiple keys separated by comma, e.g.
94+
`.option("stream.keys", "sensors-eu,sensors-us")`. In this case data from each key will be mapped to a Spark partition.
95+
Please note, the items ordering will be preserved only within a particular Redis key (Spark partition), there is no ordering guarantees for items in different keys.
96+
97+
With the second approach you can read data from a single Redis key with multiple consumers in parallel, e.g. `option("stream.parallelism", 4)`.
98+
Each consumer will be mapped to a Spark partition. There is no ordering guarantees in this case.
99+
100+
101+
102+
### Other configuration
103+
104+
The spark-redis automatically creates a consumer group with name `spark-source` if it doesn't exist. You can customize the consumer group name with
105+
`.option("stream.group.name", "my-group")`. Also you can customize the name of consumers in consumer group with `.option("stream.consumer.prefix", "my-consumer")`.
106+
107+
108+
Another options you can configure are `stream.read.batch.size` and `stream.read.block`. They define the maximum number of pulled items and time in milliseconds to wait in a `XREADGROUP` call.
109+
The default values are 100 items and 500 ms.
110+
111+
```scala
112+
.option("stream.read.batch.size", 200)
113+
.option("stream.read.block", 1000)
114+
```
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
org.apache.spark.sql.redis.stream.RedisStreamProvider

src/main/scala/com/redislabs/provider/redis/streaming/RedisStreamReceiver.scala

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@ package com.redislabs.provider.redis.streaming
22

33
import java.util.AbstractMap.SimpleEntry
44

5-
import com.redislabs.provider.redis.util.Logging
65
import com.redislabs.provider.redis.util.PipelineUtils.foreachWithPipeline
6+
import com.redislabs.provider.redis.util.{Logging, StreamUtils}
77
import com.redislabs.provider.redis.{ReadWriteConfig, RedisConfig}
88
import org.apache.curator.utils.ThreadUtils
99
import org.apache.spark.storage.StorageLevel
@@ -59,17 +59,12 @@ class RedisStreamReceiver(consumersConfig: Seq[ConsumerConfig],
5959
}
6060

6161
def createConsumerGroupIfNotExist(): Unit = {
62-
try {
63-
val entryId = conf.offset match {
64-
case Earliest => new EntryID(0, 0)
65-
case Latest => EntryID.LAST_ENTRY
66-
case IdOffset(v1, v2) => new EntryID(v1, v2)
67-
}
68-
jedis.xgroupCreate(conf.streamKey, conf.groupName, entryId, true)
69-
} catch {
70-
case e: Exception =>
71-
if (!e.getMessage.contains("already exists")) throw e
62+
val entryId = conf.offset match {
63+
case Earliest => new EntryID(0, 0)
64+
case Latest => EntryID.LAST_ENTRY
65+
case IdOffset(v1, v2) => new EntryID(v1, v2)
7266
}
67+
StreamUtils.createConsumerGroupIfNotExist(jedis, conf.streamKey, conf.groupName, entryId)
7368
}
7469

7570
def receiveUnacknowledged(): Unit = {
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package com.redislabs.provider.redis.util
2+
3+
import scala.collection.IterableLike
4+
import scala.collection.generic.CanBuildFrom
5+
6+
/**
7+
* @author The Viet Nguyen
8+
*/
9+
object CollectionUtils {
10+
11+
implicit class RichCollection[A, Repr](val xs: IterableLike[A, Repr]) extends AnyVal {
12+
13+
def distinctBy[B, That](f: A => B)(implicit cbf: CanBuildFrom[Repr, A, That]): That = {
14+
val builder = cbf(xs.repr)
15+
val iterator = xs.iterator
16+
var set = Set[B]()
17+
while (iterator.hasNext) {
18+
val element = iterator.next
19+
val distinctField = f(element)
20+
if (!set(distinctField)) {
21+
set += distinctField
22+
builder += element
23+
}
24+
}
25+
builder.result
26+
}
27+
}
28+
29+
}
Lines changed: 69 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,81 @@
11
package com.redislabs.provider.redis.util
22

3+
import java.util.{List => JList}
4+
5+
import com.redislabs.provider.redis.RedisConfig
6+
import com.redislabs.provider.redis.util.ConnectionUtils.XINFO.{SubCommandGroups, SubCommandStream}
37
import redis.clients.jedis.Jedis
8+
import redis.clients.jedis.commands.ProtocolCommand
9+
import redis.clients.jedis.util.SafeEncoder
10+
11+
import scala.collection.JavaConverters._
412

513
/**
614
* @author The Viet Nguyen
715
*/
816
object ConnectionUtils {
917

1018
def withConnection[A](conn: Jedis)(body: Jedis => A): A = {
11-
val res = body(conn)
12-
conn.close()
13-
res
19+
try {
20+
body(conn)
21+
} finally {
22+
conn.close()
23+
}
24+
}
25+
26+
def withConnection[A](streamKey: String)(body: Jedis => A)(implicit redisConfig: RedisConfig): A = {
27+
withConnection(redisConfig.connectionForKey(streamKey)){
28+
body
29+
}
30+
}
31+
32+
implicit class JedisExt(val jedis: Jedis) extends AnyVal {
33+
34+
//TODO: temporary solution to get latest offset while not supported by Jedis
35+
def xinfo(command: String, args: String*): Map[String, Any] = {
36+
val client = jedis.getClient
37+
val combinedArgs = command +: args
38+
client.sendCommand(XINFO, combinedArgs: _*)
39+
val response = asList(client.getOne).asScala
40+
command match {
41+
case SubCommandStream =>
42+
asMap(response)
43+
case SubCommandGroups =>
44+
response.map(m => asList(m)).map(_.asScala).map(asMap)
45+
.map(m => String.valueOf(m("name")) -> m).toMap
46+
}
47+
}
48+
49+
private def asMap(seq: Seq[Any]): Map[String, Any] = {
50+
seq.grouped(2)
51+
.map { group =>
52+
val key = asString(group.head)
53+
val value = group(1) match {
54+
case arr: Array[Byte] => asString(arr)
55+
case other: Any => other
56+
}
57+
key -> value
58+
}.toMap
59+
}
60+
61+
private def asList(any: Any): JList[Any] =
62+
any.asInstanceOf[JList[Any]]
63+
64+
private def asString(any: Any): String =
65+
new String(any.asInstanceOf[Array[Byte]])
66+
}
67+
68+
object XINFO extends ProtocolCommand {
69+
70+
val SubCommandStream = "STREAM"
71+
val SubCommandGroups = "GROUPS"
72+
73+
val LastGeneratedId = "last-generated-id"
74+
val LastDeliveredId = "last-delivered-id"
75+
val LastEntry = "last-entry"
76+
val EntryId = "_id"
77+
78+
override def getRaw: Array[Byte] = SafeEncoder.encode("XINFO")
1479
}
80+
1581
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package com.redislabs.provider.redis.util
2+
3+
import org.json4s.jackson.Serialization
4+
import org.json4s.{Formats, NoTypeHints}
5+
6+
/**
7+
* @author The Viet Nguyen
8+
*/
9+
object JsonUtils {
10+
11+
private implicit val formats: Formats = Serialization.formats(NoTypeHints)
12+
13+
def toJson(any: AnyRef): String = {
14+
Serialization.write(any)
15+
}
16+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package com.redislabs.provider.redis.util
2+
3+
import java.lang.{Boolean => JBoolean, Byte => JByte, Double => JDouble, Float => JFloat, Long => JLong, Short => JShort}
4+
5+
import org.apache.spark.sql.types._
6+
7+
/**
8+
* @author The Viet Nguyen
9+
*/
10+
object ParseUtils {
11+
12+
def parseFields(value: Map[String, String], schema: StructType): Array[Any] =
13+
schema.fields.map { field =>
14+
val fieldName = field.name
15+
val fieldValue = value(fieldName)
16+
parseValue(field.dataType, fieldValue)
17+
}
18+
19+
private def parseValue(dataType: DataType, fieldValueStr: String): Any = {
20+
if (fieldValueStr == null) {
21+
null
22+
} else {
23+
parseNotNullValue(dataType, fieldValueStr)
24+
}
25+
}
26+
27+
private def parseNotNullValue(dataType: DataType, fieldValueStr: String): Any =
28+
dataType match {
29+
case ByteType => JByte.parseByte(fieldValueStr)
30+
case IntegerType => Integer.parseInt(fieldValueStr)
31+
case LongType => JLong.parseLong(fieldValueStr)
32+
case FloatType => JFloat.parseFloat(fieldValueStr)
33+
case DoubleType => JDouble.parseDouble(fieldValueStr)
34+
case BooleanType => JBoolean.parseBoolean(fieldValueStr)
35+
case ShortType => JShort.parseShort(fieldValueStr)
36+
case DateType => java.sql.Date.valueOf(fieldValueStr)
37+
case TimestampType => java.sql.Timestamp.valueOf(fieldValueStr)
38+
case _ => fieldValueStr
39+
}
40+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package com.redislabs.provider.redis.util
2+
3+
import org.apache.commons.lang3.StringUtils
4+
import redis.clients.jedis.{EntryID, Jedis}
5+
6+
/**
7+
* @author The Viet Nguyen
8+
*/
9+
object StreamUtils extends Logging {
10+
11+
val EntryIdEarliest = new EntryID(0, 0)
12+
13+
def createConsumerGroupIfNotExist(conn: Jedis, streamKey: String, groupName: String,
14+
offset: EntryID): Unit = {
15+
try {
16+
conn.xgroupCreate(streamKey, groupName, offset, true)
17+
} catch {
18+
case e: Exception if StringUtils.contains(e.getMessage, "already exists") =>
19+
logInfo(s"Consumer group already exists: $groupName")
20+
}
21+
}
22+
23+
def resetConsumerGroup(conn: Jedis, streamKey: String, groupName: String,
24+
offset: EntryID): Unit = {
25+
logInfo(s"Setting consumer group $groupName id to $offset")
26+
conn.xgroupSetID(streamKey, groupName, offset)
27+
}
28+
}

0 commit comments

Comments
 (0)