Skip to content

Commit 0acb087

Browse files
authored
Merge pull request #116 from RedisLabs/stream
#115 Spark Streaming: Redis Stream data structure
2 parents 374de3c + e22407a commit 0acb087

17 files changed

+566
-16
lines changed

.travis.yml

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,11 @@ sudo: required
22
language: scala
33
scala:
44
- 2.11.2
5-
before_install:
6-
- git clone https://github.com/antirez/redis.git redis_for_spark-redis_test || true
75
install:
8-
- make -C redis_for_spark-redis_test -j4
6+
- wget http://download.redis.io/releases/redis-5.0.1.tar.gz
7+
- tar -xzvf redis-5.0.1.tar.gz
8+
- make -C redis-5.0.1 -j4
9+
- export PATH=$PWD/redis-5.0.1/src:$PATH
910
script: make test
1011
cache:
1112
directories:

doc/streaming.md

Lines changed: 113 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,115 @@
11
### Streaming
2-
Spark-Redis support streaming data from Redis instance/cluster, currently streaming data are fetched from Redis' List by the `blpop` command. Users are required to provide an array which stores all the List names they are interested in. The [storageLevel](http://spark.apache.org/docs/latest/streaming-programming-guide.html#data-serialization) is `MEMORY_AND_DISK_SER_2` by default, you can change it on your demand.
3-
`createRedisStream` will create a `(listName, value)` stream, but if you don't care about which list feeds the value, you can use `createRedisStreamWithoutListname` to get the only `value` stream.
2+
3+
Spark-Redis supports streaming data from Stream and List data structures:
4+
5+
- [Redis Stream](#redis-stream)
6+
- [Redis List](#redis-list)
7+
8+
9+
## Redis Stream
10+
11+
To stream data from [Redis Stream](https://redis.io/topics/streams-intro) use `createRedisXStream` method:
12+
13+
```scala
14+
import com.redislabs.provider.redis._
15+
import com.redislabs.provider.redis.streaming.{ConsumerConfig, StreamItem}
16+
import org.apache.spark.sql.SparkSession
17+
import org.apache.spark.streaming.dstream.InputDStream
18+
import org.apache.spark.streaming.{Seconds, StreamingContext}
19+
20+
val spark = SparkSession.builder.appName("Redis Stream Example")
21+
.master("local[*]")
22+
.config("spark.redis.host", "localhost")
23+
.config("spark.redis.port", "6379")
24+
.getOrCreate()
25+
26+
val ssc = new StreamingContext(spark.sparkContext, Seconds(1))
27+
28+
val stream = ssc.createRedisXStream(Seq(ConsumerConfig("my-stream", "my-consumer-group", "my-consumer-1")))
29+
stream.print()
30+
31+
ssc.start()
32+
ssc.awaitTermination()
33+
34+
```
35+
36+
It will automatically create a consumer group if it doesn't exist and will start listening for the messages in the stream.
37+
38+
### Stream Offset
39+
40+
By default it pulls messages starting from the latest message. If you need to start from the earliest message or any specific position in the stream, specify the `offset` parameter:
41+
42+
```scala
43+
ConsumerConfig("my-stream", "my-consumer-group", "my-consumer-1", offset = Earliest) // start from '0-0'
44+
ConsumerConfig("my-stream", "my-consumer-group", "my-consumer-1", IdOffset(42, 0)) // start from '42-0'
45+
```
46+
47+
Please note, spark-redis will attempt to create a consumer group with the specified offset, but if the consumer group already exists,
48+
it will use the existing offset. It means, for example, if you decide to re-process all the messages from the beginning,
49+
just changing the offset to `Earliest` may not be enough. You may need to either manually delete the consumer
50+
group with `XGROUP DESTROY` or modify the offset with `XGROUP SETID`.
51+
52+
### Receiver reliability
53+
54+
The DStream is implemented with a [Reliable Receiver](https://spark.apache.org/docs/latest/streaming-custom-receivers.html#receiver-reliability) that acknowledges
55+
after the data has been stored in Spark. As with any other Receiver to achieve strong fault-tolerance guarantees and ensure zero data loss, you have to enable [write-ahead logs](https://spark.apache.org/docs/latest/streaming-programming-guide.html#deploying-applications) and checkpointing.
56+
57+
The received data is stored with `StorageLevel.MEMORY_AND_DISK_2` by default.
58+
Storage level can be configured with `storageLevel` parameter, e.g.:
59+
```scala
60+
ssc.createRedisXStream(conf, storageLevel = StorageLevel.MEMORY_AND_DISK_SER_2)
61+
```
62+
63+
### Level of Parallelism
64+
65+
The `createRedisXStream()` takes a sequence of consumer configs, each consumer is started in a separate thread. This allows you, for example, to
66+
create a stream from multiple Redis Stream keys:
67+
68+
```scala
69+
ssc.createRedisXStream(Seq(
70+
ConsumerConfig("my-stream-1", "my-consumer-group-1", "my-consumer-1"),
71+
ConsumerConfig("my-stream-2", "my-consumer-group-2", "my-consumer-1")
72+
))
73+
```
74+
75+
In this example we created an input DStream that corresponds to a single receiver running in a Spark executor. The receiver will create two threads pulling
76+
data from the streams in parallel. However if the data receiving becomes a bottleneck, you may want to start multiple receivers in different executors (worker machines).
77+
This can be achieved by creating multiple input DStreams and `union` them together. You can read more about about it [here](https://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving).
78+
79+
For example, the following will create two receivers pulling the data from `my-stream` and balancing the load:
80+
81+
```scala
82+
val streams = Seq(
83+
ssc.createRedisXStream(Seq(ConsumerConfig("my-stream", "my-consumer-group", "my-consumer-1"))),
84+
ssc.createRedisXStream(Seq(ConsumerConfig("my-stream", "my-consumer-group", "my-consumer-2")))
85+
)
86+
87+
val stream = ssc.union(streams)
88+
stream.print()
89+
```
90+
91+
### Configuration
92+
93+
If the cluster resources is not large enough to process data as fast as it is being received, the receiving rate can be limited:
94+
95+
```scala
96+
ConsumerConfig("stream", "group", "c-1", rateLimitPerConsumer = Some(100)) // 100 items per second
97+
```
98+
99+
It defines the number of received items per second per consumer.
100+
101+
Another options you can configure are `batchSize` and `block`. They define the maximum number of pulled items and time in milliseconds to wait in a `XREADGROUP` call.
102+
103+
```scala
104+
ConsumerConfig("stream", "group", "c-1", batchSize = 50, block = 200)
105+
```
106+
107+
108+
## Redis List
109+
110+
The stream can be also created from Redis' List, the data is fetched with the `blpop` command. Users are required to provide an array which stores all the List names they are interested in. The [storageLevel](http://spark.apache.org/docs/latest/streaming-programming-guide.html#data-serialization) is `MEMORY_AND_DISK_SER_2` by default, you can change it on your demand.
111+
112+
The method `createRedisStream` will create a `(listName, value)` stream, but if you don't care about which list feeds the value, you can use `createRedisStreamWithoutListname` to get the only `value` stream.
4113

5114
Use the following to get a `(listName, value)` stream from `foo` and `bar` list
6115

@@ -10,7 +119,7 @@ import org.apache.spark.storage.StorageLevel
10119
import com.redislabs.provider.redis._
11120
val ssc = new StreamingContext(sc, Seconds(1))
12121
val redisStream = ssc.createRedisStream(Array("foo", "bar"), storageLevel = StorageLevel.MEMORY_AND_DISK_2)
13-
redisStream.print
122+
redisStream.print()
14123
ssc.awaitTermination()
15124
```
16125

@@ -23,6 +132,6 @@ import org.apache.spark.storage.StorageLevel
23132
import com.redislabs.provider.redis._
24133
val ssc = new StreamingContext(sc, Seconds(1))
25134
val redisStream = ssc.createRedisStreamWithoutListname(Array("foo", "bar"), storageLevel = StorageLevel.MEMORY_AND_DISK_2)
26-
redisStream.print
135+
redisStream.print()
27136
ssc.awaitTermination()
28137
```

pom.xml

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@
4949
<java.version>1.8</java.version>
5050
<scala.major.version>2.11</scala.major.version>
5151
<scala.complete.version>${scala.major.version}.12</scala.complete.version>
52-
<jedis.version>2.9.0</jedis.version>
52+
<jedis.version>3.0.0-20181113.105826-9</jedis.version>
5353
<spark.version>2.3.1</spark.version>
5454
<plugins.scalatest.version>1.0</plugins.scalatest.version>
5555
</properties>
@@ -65,6 +65,20 @@
6565
</repository>
6666
</distributionManagement>
6767

68+
<!-- TODO: temporal to get jedis SNAPSHOT -->
69+
<repositories>
70+
<repository>
71+
<id>oss.sonatype.org-snapshot</id>
72+
<url>http://oss.sonatype.org/content/repositories/snapshots</url>
73+
<releases>
74+
<enabled>false</enabled>
75+
</releases>
76+
<snapshots>
77+
<enabled>true</enabled>
78+
</snapshots>
79+
</repository>
80+
</repositories>
81+
6882
<build>
6983
<plugins>
7084
<plugin>
@@ -258,7 +272,7 @@
258272
<version>2.0</version>
259273
</dependency>
260274
<dependency>
261-
<groupId>redis.clients</groupId>
275+
<groupId>com.redislabs</groupId>
262276
<artifactId>jedis</artifactId>
263277
<version>${jedis.version}</version>
264278
<type>jar</type>

src/main/scala/com/redislabs/provider/redis/RedisConfig.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@ package com.redislabs.provider.redis
33
import java.net.URI
44

55
import org.apache.spark.SparkConf
6+
import redis.clients.jedis.util.{JedisClusterCRC16, JedisURIHelper, SafeEncoder}
67
import redis.clients.jedis.{Jedis, Protocol}
7-
import redis.clients.util.{JedisURIHelper, SafeEncoder, JedisClusterCRC16}
8+
89
import scala.collection.JavaConversions._
910

1011

src/main/scala/com/redislabs/provider/redis/rdd/RedisRDD.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ import com.redislabs.provider.redis.util.PipelineUtils.mapWithPipeline
77
import com.redislabs.provider.redis.{ReadWriteConfig, RedisConfig, RedisNode}
88
import org.apache.spark._
99
import org.apache.spark.rdd.RDD
10-
import redis.clients.jedis._
11-
import redis.clients.util.JedisClusterCRC16
10+
import redis.clients.jedis.{Jedis, ScanParams}
11+
import redis.clients.jedis.util.JedisClusterCRC16
1212

1313
import scala.collection.JavaConversions._
1414
import scala.reflect.{ClassTag, classTag}
@@ -408,7 +408,7 @@ trait Keys {
408408
do {
409409
val scan = jedis.scan(cursor, params)
410410
keys.addAll(scan.getResult)
411-
cursor = scan.getStringCursor
411+
cursor = scan.getCursor
412412
} while (cursor != "0")
413413
keys
414414
}

src/main/scala/com/redislabs/provider/redis/redisFunctions.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
package com.redislabs.provider.redis
22

3-
import com.redislabs.provider.redis.streaming.RedisInputDStream
3+
import com.redislabs.provider.redis.streaming.{ConsumerConfig, RedisInputDStream, RedisStreamReceiver, StreamItem}
44
import org.apache.spark.SparkContext
55
import org.apache.spark.rdd.RDD
66
import com.redislabs.provider.redis.rdd._
77
import com.redislabs.provider.redis.util.PipelineUtils._
88
import org.apache.spark.storage.StorageLevel
99
import org.apache.spark.streaming.StreamingContext
10+
import org.apache.spark.streaming.dstream.InputDStream
1011

1112
/**
1213
* RedisContext extends sparkContext's functionality with redis functions
@@ -456,6 +457,14 @@ class RedisStreamingContext(@transient val ssc: StreamingContext) extends Serial
456457
(implicit redisConfig: RedisConfig = RedisConfig.fromSparkConf(ssc.sparkContext.getConf)) = {
457458
new RedisInputDStream(ssc, keys, storageLevel, redisConfig, classOf[String])
458459
}
460+
461+
def createRedisXStream(consumersConfig: Seq[ConsumerConfig],
462+
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_2)
463+
(implicit redisConfig: RedisConfig = RedisConfig.fromSparkConf(ssc.sparkContext.getConf)): InputDStream[StreamItem] = {
464+
val readWriteConfig = ReadWriteConfig.fromSparkConf(ssc.sparkContext.getConf)
465+
val receiver = new RedisStreamReceiver(consumersConfig, redisConfig, readWriteConfig, storageLevel)
466+
ssc.receiverStream(receiver)
467+
}
459468
}
460469

461470
trait RedisFunctions {

src/main/scala/com/redislabs/provider/redis/streaming/RedisInputDStream.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@ import redis.clients.jedis._
1212
import scala.reflect.{ClassTag, classTag}
1313
import scala.util.control.NonFatal
1414

15+
/**
16+
* Receives messages from Redis List
17+
*/
1518
class RedisInputDStream[T: ClassTag](_ssc: StreamingContext,
1619
keys: Array[String],
1720
storageLevel: StorageLevel,

0 commit comments

Comments
 (0)